This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f96db3733c2 [SQL] Alter Catalog and Table statements (#36571)
f96db3733c2 is described below
commit f96db3733c2486b8cbe4a8f3fe479cdef5e75cff
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Feb 9 12:01:37 2026 -0500
[SQL] Alter Catalog and Table statements (#36571)
* alter catalog
* alter table
* spotless
* spotless
* fix unparse method, cleanup
* add more tests
* add deps
* address comments: add java docs; add unit tests for unparsing; fix nits
* spotless
---
.github/trigger_files/beam_PostCommit_SQL.json | 2 +-
.github/trigger_files/beam_PreCommit_SQL.json | 2 +-
sdks/java/extensions/sql/iceberg/build.gradle | 2 +
.../provider/iceberg/IcebergAlterTableOps.java | 50 ++++
.../sql/meta/provider/iceberg/IcebergCatalog.java | 50 ++--
.../meta/provider/iceberg/IcebergMetastore.java | 16 +-
.../iceberg/BeamSqlCliIcebergAlterTest.java | 231 ++++++++++++++++++
.../extensions/sql/src/main/codegen/config.fmpp | 7 +
.../sql/src/main/codegen/includes/parserImpls.ftl | 120 +++++++++-
.../sdk/extensions/sql/impl/CatalogSchema.java | 4 +
.../sql/impl/parser/SqlAlterCatalog.java | 164 +++++++++++++
.../extensions/sql/impl/parser/SqlAlterTable.java | 259 +++++++++++++++++++++
.../sql/impl/parser/SqlCreateExternalTable.java | 2 +-
.../extensions/sql/impl/parser/SqlDdlNodes.java | 41 ++++
.../sql/impl/parser/SqlDropDatabase.java | 6 +-
.../extensions/sql/impl/parser/SqlDropTable.java | 6 +-
.../extensions/sql/impl/parser/SqlUseDatabase.java | 2 +-
.../sdk/extensions/sql/meta/catalog/Catalog.java | 3 +
.../sql/meta/catalog/InMemoryCatalog.java | 8 +-
.../sql/meta/provider/AlterTableOps.java | 45 ++++
.../sql/meta/provider/TableProvider.java | 5 +
.../sql/meta/provider/test/AlterTestTableOps.java | 101 ++++++++
.../sql/meta/provider/test/TestTableProvider.java | 21 ++
.../sql/meta/store/InMemoryMetaStore.java | 11 +
.../sdk/extensions/sql/BeamSqlCliCatalogTest.java | 93 ++++++++
.../beam/sdk/extensions/sql/BeamSqlCliTest.java | 249 ++++++++++++++++++++
.../beam/sdk/io/iceberg/IcebergCatalogConfig.java | 103 ++++++--
.../apache/beam/sdk/io/iceberg/PartitionUtils.java | 37 +++
.../beam/sdk/io/iceberg/FilterUtilsTest.java | 6 -
29 files changed, 1595 insertions(+), 51 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_SQL.json
b/.github/trigger_files/beam_PostCommit_SQL.json
index 5df3841d236..e584718ac8f 100644
--- a/.github/trigger_files/beam_PostCommit_SQL.json
+++ b/.github/trigger_files/beam_PostCommit_SQL.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run ",
- "modification": 3
+ "modification": 0
}
diff --git a/.github/trigger_files/beam_PreCommit_SQL.json
b/.github/trigger_files/beam_PreCommit_SQL.json
index ab4daeae234..07d1fb88996 100644
--- a/.github/trigger_files/beam_PreCommit_SQL.json
+++ b/.github/trigger_files/beam_PreCommit_SQL.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 3
+ "modification": 0
}
diff --git a/sdks/java/extensions/sql/iceberg/build.gradle
b/sdks/java/extensions/sql/iceberg/build.gradle
index d5f9e74c53b..1e319c97a8e 100644
--- a/sdks/java/extensions/sql/iceberg/build.gradle
+++ b/sdks/java/extensions/sql/iceberg/build.gradle
@@ -31,6 +31,8 @@ dependencies {
implementation project(":sdks:java:core")
implementation project(":sdks:java:managed")
implementation project(":sdks:java:io:iceberg")
+ implementation library.java.jackson_databind
+ implementation library.java.jackson_core
runtimeOnly project(":sdks:java:io:iceberg:bqms")
runtimeOnly project(":sdks:java:io:iceberg:hive")
// TODO(https://github.com/apache/beam/issues/21156): Determine how to build
without this dependency
diff --git
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergAlterTableOps.java
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergAlterTableOps.java
new file mode 100644
index 00000000000..0c8a5519ee4
--- /dev/null
+++
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergAlterTableOps.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
+import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig.IcebergTableInfo;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** {@link AlterTableOps} for Iceberg tables. */
+public class IcebergAlterTableOps implements AlterTableOps {
+ private final IcebergTableInfo table;
+
+ IcebergAlterTableOps(IcebergTableInfo table) {
+ this.table = table;
+ }
+
+ @Override
+ public void updateTableProperties(Map<String, String> setProps, List<String>
resetProps) {
+ table.updateTableProps(setProps, resetProps);
+ }
+
+ @Override
+ public void updateSchema(List<Schema.Field> columnsToAdd, Collection<String>
columnsToDrop) {
+ table.updateSchema(columnsToAdd, columnsToDrop);
+ }
+
+ @Override
+ public void updatePartitionSpec(
+ List<String> partitionsToAdd, Collection<String> partitionsToDrop) {
+ table.updatePartitionSpec(partitionsToAdd, partitionsToDrop);
+ }
+}
diff --git
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
index 7dee72511e8..6330f550a2b 100644
---
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
+++
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
@@ -32,28 +32,11 @@ public class IcebergCatalog extends InMemoryCatalog {
// other SDKs can make use of it too
private static final String BEAM_HADOOP_PREFIX = "beam.catalog.hadoop";
private final Map<String, IcebergMetastore> metaStores = new HashMap<>();
- @VisibleForTesting final IcebergCatalogConfig catalogConfig;
+ @VisibleForTesting IcebergCatalogConfig catalogConfig;
- public IcebergCatalog(String name, Map<String, String> properties) {
- super(name, properties);
-
- ImmutableMap.Builder<String, String> catalogProps = ImmutableMap.builder();
- ImmutableMap.Builder<String, String> hadoopProps = ImmutableMap.builder();
-
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- if (entry.getKey().startsWith(BEAM_HADOOP_PREFIX)) {
- hadoopProps.put(entry.getKey(), entry.getValue());
- } else {
- catalogProps.put(entry.getKey(), entry.getValue());
- }
- }
-
- catalogConfig =
- IcebergCatalogConfig.builder()
- .setCatalogName(name)
- .setCatalogProperties(catalogProps.build())
- .setConfigProperties(hadoopProps.build())
- .build();
+ public IcebergCatalog(String name, Map<String, String> props) {
+ super(name, props);
+ catalogConfig = initConfig(name, props);
}
@Override
@@ -67,6 +50,12 @@ public class IcebergCatalog extends InMemoryCatalog {
return "iceberg";
}
+ @Override
+ public void updateProperties(Map<String, String> setProps,
Collection<String> resetProps) {
+ super.updateProperties(setProps, resetProps);
+ catalogConfig = initConfig(name(), properties());
+ }
+
@Override
public boolean createDatabase(String database) {
return catalogConfig.createNamespace(database);
@@ -97,4 +86,23 @@ public class IcebergCatalog extends InMemoryCatalog {
}
return removed;
}
+
+ private static IcebergCatalogConfig initConfig(String name, Map<String,
String> properties) {
+ ImmutableMap.Builder<String, String> catalogProps = ImmutableMap.builder();
+ ImmutableMap.Builder<String, String> hadoopProps = ImmutableMap.builder();
+
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (entry.getKey().startsWith(BEAM_HADOOP_PREFIX)) {
+ hadoopProps.put(entry.getKey(), entry.getValue());
+ } else {
+ catalogProps.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return IcebergCatalogConfig.builder()
+ .setCatalogName(name)
+ .setCatalogProperties(catalogProps.build())
+ .setConfigProperties(hadoopProps.build())
+ .build();
+ }
}
diff --git
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java
index b73aa25c7a2..678c76153c5 100644
---
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java
+++
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java
@@ -20,6 +20,7 @@ package
org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import com.fasterxml.jackson.core.type.TypeReference;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.TableUtils;
@@ -60,7 +61,11 @@ public class IcebergMetastore extends InMemoryMetaStore {
} else {
String identifier = getIdentifier(table);
try {
- catalogConfig.createTable(identifier, table.getSchema(),
table.getPartitionFields());
+ Map<String, String> properties =
+ TableUtils.getObjectMapper()
+ .convertValue(table.getProperties(), new
TypeReference<Map<String, String>>() {});
+ catalogConfig.createTable(
+ identifier, table.getSchema(), table.getPartitionFields(),
properties);
} catch (TableAlreadyExistsException e) {
LOG.info(
"Iceberg table '{}' already exists at location '{}'.",
table.getName(), identifier);
@@ -147,6 +152,15 @@ public class IcebergMetastore extends InMemoryMetaStore {
return getProvider(table.getType()).supportsPartitioning(table);
}
+ @Override
+ public IcebergAlterTableOps alterTable(String name) {
+ IcebergTableInfo table =
+ checkStateNotNull(
+ catalogConfig.loadTable(getIdentifier(name)), "Could not find
table '%s'", name);
+
+ return new IcebergAlterTableOps(table);
+ }
+
@Override
public void registerProvider(TableProvider provider) {
super.registerProvider(provider);
diff --git
a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java
b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java
new file mode 100644
index 00000000000..b6fd2b14a5e
--- /dev/null
+++
b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import static java.lang.String.format;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+/** Unit tests specifically for ALTERing Iceberg catalogs and tables. */
+public class BeamSqlCliIcebergAlterTest {
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+ private InMemoryCatalogManager catalogManager;
+ private BeamSqlCli cli;
+ private String warehouse;
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Before
+ public void setup() throws IOException {
+ catalogManager = new InMemoryCatalogManager();
+ cli = new BeamSqlCli().catalogManager(catalogManager);
+ File warehouseFile = TEMPORARY_FOLDER.newFolder();
+ assertTrue(warehouseFile.delete());
+ warehouse = "file:" + warehouseFile + "/" + UUID.randomUUID();
+ }
+
+ private String createCatalog(String name) {
+ return format("CREATE CATALOG %s \n", name)
+ + "TYPE iceberg \n"
+ + "PROPERTIES (\n"
+ + " 'type' = 'hadoop', \n"
+ + format(" 'warehouse' = '%s')", warehouse);
+ }
+
+ @Test
+ public void testAlterCatalog() {
+ cli.execute(createCatalog("my_catalog"));
+ IcebergCatalog catalog =
+ (IcebergCatalog)
checkStateNotNull(catalogManager.getCatalog("my_catalog"));
+ Map<String, String> expected = ImmutableMap.of("type", "hadoop",
"warehouse", warehouse);
+ assertEquals(expected, catalog.properties());
+ assertEquals(expected, catalog.catalogConfig.getCatalogProperties());
+
+ cli.execute("ALTER CATALOG my_catalog SET ('abc'='123', 'foo'='bar') RESET
('type')");
+ expected = ImmutableMap.of("warehouse", warehouse, "abc", "123", "foo",
"bar");
+ assertEquals(expected, catalog.properties());
+ assertEquals(expected, catalog.catalogConfig.getCatalogProperties());
+ }
+
+ @Test
+ public void testAlterTableProps() {
+ cli.execute(createCatalog("my_catalog"));
+ cli.execute("CREATE DATABASE my_catalog.my_db");
+ cli.execute("USE DATABASE my_catalog.my_db");
+ cli.execute(
+ "CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER) TYPE
'iceberg' TBLPROPERTIES '{ \"prop1\" : \"123\", \"prop2\" : \"abc\"}'");
+ IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog();
+ Table table =
+
catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table"));
+ assertThat(table.properties(), allOf(hasEntry("prop1", "123"),
hasEntry("prop2", "abc")));
+
+ cli.execute("ALTER TABLE my_table SET('prop3'='foo')");
+ table.refresh();
+ assertThat(
+ table.properties(),
+ allOf(hasEntry("prop1", "123"), hasEntry("prop2", "abc"),
hasEntry("prop3", "foo")));
+
+ cli.execute("ALTER TABLE my_table RESET ('prop1') SET ('prop2'='xyz')");
+ table.refresh();
+ assertThat(table.properties(), allOf(hasEntry("prop2", "xyz"),
hasEntry("prop3", "foo")));
+ }
+
+ @Test
+ public void testAlterTableSchema() {
+ cli.execute(createCatalog("my_catalog"));
+ cli.execute("CREATE DATABASE my_catalog.my_db");
+ cli.execute("USE DATABASE my_catalog.my_db");
+ cli.execute("CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER)
TYPE 'iceberg'");
+ IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog();
+ Table table =
+
catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table"));
+ Schema actualSchema = table.schema();
+ Schema expectedSchema =
+ new Schema(
+ optional(1, "col1", Types.StringType.get()),
+ optional(2, "col2", Types.IntegerType.get()));
+ assertTrue(
+ String.format("Unequal schemas.\nExpected: %s\nActual: %s",
expectedSchema, actualSchema),
+ expectedSchema.sameSchema(actualSchema));
+
+ // add some columns
+ cli.execute(
+ "ALTER TABLE my_table ADD COLUMNS (col3 BOOLEAN COMMENT
'col3-comment', col4 FLOAT COMMENT 'col4-comment')");
+ table.refresh();
+ actualSchema = table.schema();
+ expectedSchema =
+ new Schema(
+ optional(1, "col1", Types.StringType.get()),
+ optional(2, "col2", Types.IntegerType.get()),
+ optional(3, "col3", Types.BooleanType.get(), "col3-comment"),
+ optional(4, "col4", Types.FloatType.get(), "col4-comment"));
+ assertTrue(
+ String.format("Unequal schemas.\nExpected: %s\nActual: %s",
expectedSchema, actualSchema),
+ expectedSchema.sameSchema(actualSchema));
+
+ // remove some columns and add other columns
+ cli.execute(
+ "ALTER TABLE my_table DROP COLUMNS (col1, col2, col3) ADD COLUMNS
(colA VARCHAR, colB INTEGER)");
+ table.refresh();
+ actualSchema = table.schema();
+ expectedSchema =
+ new Schema(
+ optional(4, "col4", Types.FloatType.get(), "col4-comment"),
+ optional(5, "colA", Types.StringType.get()),
+ optional(6, "colB", Types.IntegerType.get()));
+ assertTrue(
+ String.format("Unequal schemas.\nExpected: %s\nActual: %s",
expectedSchema, actualSchema),
+ expectedSchema.sameSchema(actualSchema));
+ }
+
+ @Test
+ public void testAlterTableSchemaFailsHelpfullyWhenAddingRequiredColumns() {
+ // adding required columns is not yet supported because Beam Schemas do not
+ // allow specifying a 'default value' for fields. This concept is required
when adding a new
+ // Iceberg columns because it allows previously written rows to default to
this value.
+ cli.execute(createCatalog("my_catalog"));
+ cli.execute("CREATE DATABASE my_catalog.my_db");
+ cli.execute("USE DATABASE my_catalog.my_db");
+ cli.execute("CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER)
TYPE 'iceberg'");
+
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage(
+ "Adding required columns is not yet supported. Encountered required
columns: [col3]");
+ cli.execute("ALTER TABLE my_table ADD COLUMNS (col3 BOOLEAN NOT NULL)");
+ }
+
+ @Test
+ public void testAlterTablePartitionSpec() {
+ cli.execute(createCatalog("my_catalog"));
+ cli.execute("CREATE DATABASE my_catalog.my_db");
+ cli.execute("USE DATABASE my_catalog.my_db");
+ cli.execute(
+ "CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER, col3
FLOAT, col4 BOOLEAN, col5 TIMESTAMP) "
+ + "TYPE 'iceberg' PARTITIONED BY ('col3', 'bucket(col2, 3)')");
+ IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog();
+ Table table =
+
catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table"));
+ PartitionSpec actualSpec = table.spec();
+ PartitionSpec expectedSpec =
+
PartitionSpec.builderFor(table.schema()).identity("col3").bucket("col2",
3).build();
+ assertTrue(
+ String.format(
+ "Partition specs are not compatible.\nExpected: %s\nActual: %s",
+ expectedSpec, actualSpec),
+ expectedSpec.compatibleWith(actualSpec));
+
+ // add some partitions
+ cli.execute("ALTER TABLE my_table ADD PARTITIONS ('col4', 'month(col5)')");
+ table.refresh();
+ actualSpec = table.spec();
+ expectedSpec =
+ PartitionSpec.builderFor(table.schema())
+ .identity("col3")
+ .bucket("col2", 3)
+ .identity("col4")
+ .month("col5")
+ .withSpecId(table.spec().specId())
+ .build();
+ assertTrue(
+ String.format(
+ "Partition specs are not compatible.\nExpected: %s\nActual: %s",
+ expectedSpec, actualSpec),
+ expectedSpec.compatibleWith(actualSpec));
+
+ // remove some partitions and add other partitions
+ cli.execute(
+ "ALTER TABLE my_table DROP PARTITIONS ('month(col5)', 'bucket(col2,
3)') ADD PARTITIONS ('hour(col5)')");
+ table.refresh();
+ actualSpec = table.spec();
+ expectedSpec =
+ PartitionSpec.builderFor(table.schema())
+ .identity("col3")
+ .identity("col4")
+ .hour("col5")
+ .withSpecId(table.spec().specId())
+ .build();
+ assertTrue(
+ String.format(
+ "Partition specs are not compatible.\nExpected: %s\nActual: %s",
+ expectedSpec, actualSpec),
+ expectedSpec.compatibleWith(actualSpec));
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
index 73af7e18150..c3692430610 100644
--- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
+++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
@@ -36,6 +36,8 @@ data: {
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseCatalog"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseDatabase"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetOptionBeam"
+ "org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterCatalog"
+ "org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterTable"
"org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils"
"org.apache.beam.sdk.schemas.Schema"
]
@@ -53,7 +55,10 @@ data: {
"CATALOGS"
"DATABASES"
"TABLES"
+ "COLUMNS"
+ "PARTITIONS"
"USE"
+ "UNSET"
]
# List of keywords from "keywords" section that are not reserved.
@@ -432,6 +437,8 @@ data: {
"SqlUseCatalog(Span.of(), null)"
"SqlUseDatabase(Span.of(), null)"
"SqlSetOptionBeam(Span.of(), null)"
+ "SqlAlterCatalog(Span.of(), null)"
+ "SqlAlterTable(Span.of(), null)"
]
# List of methods for parsing custom literals.
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index d3bb8c2af56..94c0161c492 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -178,6 +178,21 @@ SqlNode Property() :
}
}
+SqlNodeList ArgList() :
+{
+ SqlNodeList list = new SqlNodeList(getPos());
+ SqlNode property;
+}
+{
+ property = StringLiteral() { list.add(property); }
+ (
+ <COMMA> property = StringLiteral() { list.add(property); }
+ )*
+ {
+ return list;
+ }
+}
+
/**
* CREATE CATALOG ( IF NOT EXISTS )? catalog_name
* TYPE type_name
@@ -247,6 +262,41 @@ SqlCall SqlUseCatalog(Span s, String scope) :
}
+/**
+ * ALTER CATALOG catalog_name
+ * [ SET (key1=val1, key2=val2, ...) ]
+ * [ (RESET | UNSET) (key1, key2, ...) ]
+ */
+SqlCall SqlAlterCatalog(Span s, String scope) :
+{
+ final SqlNode catalogName;
+ SqlNodeList setProps = null;
+ SqlNodeList resetProps = null;
+}
+{
+ <ALTER> {
+ s.add(this);
+ }
+ <CATALOG>
+ (
+ catalogName = CompoundIdentifier()
+ |
+ catalogName = StringLiteral()
+ )
+ [ <SET> <LPAREN> setProps = PropertyList() <RPAREN> ]
+ [ (<RESET> | <UNSET>) <LPAREN> resetProps = ArgList() <RPAREN> ]
+
+ {
+ return new SqlAlterCatalog(
+ s.end(this),
+ scope,
+ catalogName,
+ setProps,
+ resetProps);
+ }
+}
+
+
SqlDrop SqlDropCatalog(Span s, boolean replace) :
{
final boolean ifExists;
@@ -464,6 +514,18 @@ SqlCall SqlShowCurrent(Span s) :
}
}
+SqlNodeList PartitionFieldsParens() :
+{
+ final SqlNodeList partitions;
+}
+{
+ <LPAREN>
+ partitions = PartitionFieldList()
+ <RPAREN>
+ {
+ return partitions;
+ }
+}
SqlNodeList PartitionFieldList() :
{
@@ -517,7 +579,7 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) :
|
type = SimpleIdentifier()
)
- [ <PARTITIONED> <BY> <LPAREN> partitionFields = PartitionFieldList()
<RPAREN> ]
+ [ <PARTITIONED> <BY> partitionFields = PartitionFieldsParens() ]
[ <COMMENT> comment = StringLiteral() ]
[ <LOCATION> location = StringLiteral() ]
[ <TBLPROPERTIES> tblProperties = StringLiteral() ]
@@ -537,6 +599,62 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) :
}
}
+/**
+ * Loosely following Flink's grammar:
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/alter/#alter-table
+ * ALTER TABLE table_name
+ * [ ADD COLUMNS <column_def, column_def, ...> ]
+ * [ DROP COLUMNS <column_name, column_name> ]
+ * [ ADD PARTITIONS <partition_field, partition_field, ...> ]
+ * [ DROP PARTITIONS <partition_field, partition_field, ...> ]
+ * [ SET (key1=val1, key2=val2, ...) ]
+ * [ (RESET | UNSET) (key1, key2, ...) ]
+ */
+SqlCall SqlAlterTable(Span s, String scope) :
+{
+ final SqlNode tableName;
+ SqlNodeList columnsToDrop = null;
+ List<Schema.Field> columnsToAdd = null;
+ SqlNodeList partitionsToDrop = null;
+ SqlNodeList partitionsToAdd = null;
+ SqlNodeList setProps = null;
+ SqlNodeList resetProps = null;
+}
+{
+ <ALTER> {
+ s.add(this);
+ }
+ <TABLE>
+ tableName = CompoundIdentifier()
+
+ [ <DROP> (
+ <COLUMNS> columnsToDrop = ParenthesizedSimpleIdentifierList()
+ |
+ <PARTITIONS> partitionsToDrop = ParenthesizedLiteralOptionCommaList()
+ ) ]
+
+ [ <ADD> (
+ <COLUMNS> columnsToAdd = FieldListParens()
+ |
+ <PARTITIONS> partitionsToAdd = ParenthesizedLiteralOptionCommaList()
+ ) ]
+
+ [ (<RESET> | <UNSET>) <LPAREN> resetProps = ArgList() <RPAREN> ]
+ [ <SET> <LPAREN> setProps = PropertyList() <RPAREN> ]
+
+ {
+ return new SqlAlterTable(
+ s.end(this),
+ scope,
+ tableName,
+ columnsToAdd,
+ columnsToDrop,
+ partitionsToAdd,
+ partitionsToDrop,
+ setProps,
+ resetProps);
+ }
+}
+
SqlCreate SqlCreateFunction(Span s, boolean replace) :
{
boolean isAggregate = false;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java
index 792e5b98bcd..e532355d856 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java
@@ -68,6 +68,10 @@ public class CatalogSchema implements Schema {
return catalog;
}
+ public void updateProperties(Map<String, String> setProps,
Collection<String> resetProps) {
+ catalog.updateProperties(setProps, resetProps);
+ }
+
public @Nullable BeamCalciteSchema getCurrentDatabaseSchema() {
return getSubSchema(catalog.currentDatabase());
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java
new file mode 100644
index 00000000000..a67b5f89242
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterCatalog extends SqlAlter implements
BeamSqlParser.ExecutableStatement {
+ private static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("ALTER CATALOG", SqlKind.OTHER_DDL);
+ private final SqlIdentifier name;
+ private final @Nullable SqlNodeList setProps;
+ private final @Nullable SqlNodeList resetProps;
+
+ /**
+ * Called by the auto-generated {@link
+ * org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl}.
Check SqlAlterCatalog
+ * in `sql/src/main/codegen/includesparserImpls.ftl` to see the
corresponding SQL syntax
+ */
+ public SqlAlterCatalog(
+ SqlParserPos pos,
+ @Nullable String scope,
+ SqlNode name,
+ @Nullable SqlNodeList setProps,
+ @Nullable SqlNodeList resetProps) {
+ super(pos, scope);
+ this.name = SqlDdlNodes.getIdentifier(name, pos);
+ this.setProps = setProps;
+ this.resetProps = resetProps;
+ }
+
+ @Override
+ public void execute(CalcitePrepare.Context context) {
+ final Pair<CalciteSchema, String> pair = SqlDdlNodes.schema(context, true,
name);
+ Schema schema = pair.left.schema;
+
+ if (!(schema instanceof CatalogManagerSchema)) {
+ throw SqlUtil.newContextException(
+ name.getParserPosition(),
+ RESOURCE.internal(
+ "Attempting to alter catalog '"
+ + SqlDdlNodes.name(name)
+ + "' with unexpected Calcite Schema of type "
+ + schema.getClass()));
+ }
+
+ CatalogSchema catalogSchema =
+ ((CatalogManagerSchema)
schema).getCatalogSchema(SqlDdlNodes.getString(name));
+
+ Map<String, String> setPropsMap = SqlDdlNodes.getStringMap(setProps);
+ Collection<String> resetPropsList = SqlDdlNodes.getStringList(resetProps);
+
+ ImmutableList.Builder<String> overlappingPropsBuilder =
ImmutableList.builder();
+
resetPropsList.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add);
+ List<String> overlappingProps = overlappingPropsBuilder.build();
+ checkState(
+ overlappingProps.isEmpty(),
+ "Invalid %s call: Found overlapping properties between SET and RESET:
%s.",
+ OPERATOR,
+ overlappingProps);
+
+ catalogSchema.updateProperties(setPropsMap, resetPropsList);
+ }
+
+ @Override
+ public void unparseAlterOperation(SqlWriter writer, int left, int right) {
+ unparse(writer, left, right);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int left, int right) {
+ writer.keyword("ALTER");
+ writer.keyword("CATALOG");
+ name.unparse(writer, left, right);
+ if (setProps != null && !setProps.isEmpty()) {
+ writer.keyword("SET");
+ writer.keyword("(");
+ for (int i = 0; i < setProps.size(); i++) {
+ if (i > 0) {
+ writer.keyword(",");
+ }
+ SqlNode property = setProps.get(i);
+ checkState(
+ property instanceof SqlNodeList,
+ String.format(
+ "Unexpected properties entry '%s' of class '%s'", property,
property.getClass()));
+ SqlNodeList kv = ((SqlNodeList) property);
+
+ kv.get(0).unparse(writer, left, right); // key
+ writer.keyword("=");
+ kv.get(1).unparse(writer, left, right); // value
+ }
+ writer.keyword(")");
+ }
+
+ if (resetProps != null) {
+ writer.keyword("RESET");
+ writer.keyword("(");
+ for (int i = 0; i < resetProps.size(); i++) {
+ if (i > 0) {
+ writer.keyword(",");
+ }
+ SqlNode field = resetProps.get(i);
+ field.unparse(writer, 0, 0);
+ }
+ writer.keyword(")");
+ }
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ ImmutableList.Builder<SqlNode> operands = ImmutableList.builder();
+ operands.add(name);
+ if (setProps != null) {
+ operands.add(setProps);
+ }
+ if (resetProps != null) {
+ operands.add(resetProps);
+ }
+ return operands.build();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java
new file mode 100644
index 00000000000..4c4fa4f8f1d
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import org.apache.beam.sdk.extensions.sql.impl.TableName;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterTable extends SqlAlter implements
BeamSqlParser.ExecutableStatement {
+ private static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("ALTER TABLE", SqlKind.ALTER_TABLE);
+ private final SqlIdentifier name;
+ private final @Nullable List<Field> columnsToAdd;
+ private final @Nullable SqlNodeList columnsToDrop;
+ private final @Nullable SqlNodeList partitionsToAdd;
+ private final @Nullable SqlNodeList partitionsToDrop;
+ private final @Nullable SqlNodeList setProps;
+ private final @Nullable SqlNodeList resetProps;
+
+ /**
+ * Called by the auto-generated {@link
+ * org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl}.
Check SqlAlterTable in
+ * `sql/src/main/codegen/includesparserImpls.ftl` to see the corresponding
SQL syntax
+ */
+ public SqlAlterTable(
+ SqlParserPos pos,
+ @Nullable String scope,
+ SqlNode name,
+ @Nullable List<Field> columnsToAdd,
+ @Nullable SqlNodeList columnsToDrop,
+ @Nullable SqlNodeList partitionsToAdd,
+ @Nullable SqlNodeList partitionsToDrop,
+ @Nullable SqlNodeList setProps,
+ @Nullable SqlNodeList resetProps) {
+ super(pos, scope);
+ this.name = SqlDdlNodes.getIdentifier(name, pos);
+ this.columnsToAdd = columnsToAdd;
+ this.columnsToDrop = columnsToDrop;
+ this.partitionsToAdd = partitionsToAdd;
+ this.partitionsToDrop = partitionsToDrop;
+ this.setProps = setProps;
+ this.resetProps = resetProps;
+ }
+
+ @Override
+ public void execute(CalcitePrepare.Context context) {
+ final Pair<CalciteSchema, String> pair = SqlDdlNodes.schema(context, true,
name);
+ TableName pathOverride = TableName.create(name.toString());
+ Schema schema = pair.left.schema;
+
+ BeamCalciteSchema beamCalciteSchema;
+ if (schema instanceof CatalogManagerSchema) {
+ CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema)
schema;
+ CatalogSchema catalogSchema =
+ pathOverride.catalog() != null
+ ? catalogManagerSchema.getCatalogSchema(pathOverride)
+ : catalogManagerSchema.getCurrentCatalogSchema();
+ beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
+ } else if (schema instanceof BeamCalciteSchema) {
+ beamCalciteSchema = (BeamCalciteSchema) schema;
+ } else {
+ throw SqlUtil.newContextException(
+ name.getParserPosition(),
+ RESOURCE.internal(
+ "Attempting to drop a table using unexpected Calcite Schema of
type "
+ + schema.getClass()));
+ }
+
+ if (beamCalciteSchema.getTable(pair.right) == null) {
+ // Table does not exist.
+ throw SqlUtil.newContextException(
+ name.getParserPosition(), RESOURCE.tableNotFound(name.toString()));
+ }
+
+ Map<String, String> setPropsMap = SqlDdlNodes.getStringMap(setProps);
+ List<String> resetPropsList = SqlDdlNodes.getStringList(resetProps);
+ List<String> columnsToDropList = SqlDdlNodes.getStringList(columnsToDrop);
+ List<String> partitionsToAddList =
SqlDdlNodes.getStringList(partitionsToAdd);
+ List<String> partitionsToDropList =
SqlDdlNodes.getStringList(partitionsToDrop);
+
+ AlterTableOps alterOps =
+
beamCalciteSchema.getTableProvider().alterTable(SqlDdlNodes.name(name));
+
+ if (!setPropsMap.isEmpty() || !resetPropsList.isEmpty()) {
+ validateNonOverlappingProps(setPropsMap, resetPropsList);
+
+ alterOps.updateTableProperties(setPropsMap, resetPropsList);
+ }
+ if (!columnsToDropList.isEmpty() || (columnsToAdd != null &&
!columnsToAdd.isEmpty())) {
+ alterOps.updateSchema(firstNonNull(columnsToAdd,
Collections.emptyList()), columnsToDropList);
+ }
+ if (!partitionsToDropList.isEmpty() || !partitionsToAddList.isEmpty()) {
+ alterOps.updatePartitionSpec(partitionsToAddList, partitionsToDropList);
+ }
+ }
+
+ private void validateNonOverlappingProps(
+ Map<String, String> setPropsMap, Collection<String> resetPropsList) {
+ ImmutableList.Builder<String> overlappingPropsBuilder =
ImmutableList.builder();
+
+
resetPropsList.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add);
+
+ List<String> overlappingProps = overlappingPropsBuilder.build();
+ checkState(
+ overlappingProps.isEmpty(),
+ "Invalid '%s' call: Found overlapping properties between SET and
RESET: %s.",
+ OPERATOR,
+ overlappingProps);
+ }
+
+ @Override
+ public void unparseAlterOperation(SqlWriter writer, int left, int right) {
+ unparse(writer, left, right);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int left, int right) {
+ writer.keyword("ALTER");
+ writer.keyword("TABLE");
+ name.unparse(writer, left, right);
+
+ if (columnsToDrop != null && !columnsToDrop.isEmpty()) {
+ writer.keyword("DROP COLUMNS");
+ SqlWriter.Frame frame = writer.startList("(", ")");
+ for (String colName : SqlDdlNodes.getStringList(columnsToDrop)) {
+ writer.sep(",");
+ writer.identifier(colName, false);
+ }
+ writer.endList(frame);
+ }
+
+ if (columnsToAdd != null && !columnsToAdd.isEmpty()) {
+ writer.keyword("ADD COLUMNS");
+ SqlWriter.Frame frame = writer.startList("(", ")");
+ columnsToAdd.forEach(column -> unparseColumn(writer, column));
+ writer.endList(frame);
+ }
+
+ if (partitionsToDrop != null && !partitionsToDrop.isEmpty()) {
+ writer.keyword("DROP PARTITIONS");
+ SqlWriter.Frame frame = writer.startList("(", ")");
+ for (String partition : SqlDdlNodes.getStringList(partitionsToDrop)) {
+ writer.sep(",");
+ writer.identifier(partition, true);
+ }
+ writer.endList(frame);
+ }
+
+ if (partitionsToAdd != null && !partitionsToAdd.isEmpty()) {
+ writer.keyword("ADD PARTITIONS");
+ SqlWriter.Frame frame = writer.startList("(", ")");
+ for (String partition : SqlDdlNodes.getStringList(partitionsToAdd)) {
+ writer.sep(",");
+ writer.identifier(partition, true);
+ }
+ writer.endList(frame);
+ }
+
+ if (resetProps != null && !resetProps.isEmpty()) {
+ writer.keyword("RESET");
+ SqlWriter.Frame frame = writer.startList("(", ")");
+ for (SqlNode resetProp : resetProps) {
+ writer.sep(",");
+ resetProp.unparse(writer, 0, 0);
+ }
+ writer.endList(frame);
+ }
+
+ if (setProps != null && !setProps.isEmpty()) {
+ writer.keyword("SET");
+ SqlWriter.Frame frame = writer.startList("(", ")");
+ for (SqlNode setProp : setProps) {
+ writer.sep(",");
+ SqlNodeList kv = (SqlNodeList) setProp;
+ kv.get(0).unparse(writer, left, right); // key
+ writer.keyword("=");
+ kv.get(1).unparse(writer, left, right); // value
+ }
+ writer.endList(frame);
+ }
+ }
+
+ private void unparseColumn(SqlWriter writer, Field column) {
+ writer.sep(",");
+ writer.identifier(column.getName(), false);
+ writer.keyword(CalciteUtils.toSqlTypeName(column.getType()).name());
+
+ if (column.getType().getNullable() != null &&
!column.getType().getNullable()) {
+ writer.keyword("NOT NULL");
+ }
+
+ if (!Strings.isNullOrEmpty(column.getDescription())) {
+ writer.keyword("COMMENT");
+ writer.identifier(column.getDescription(), true);
+ }
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ ImmutableList.Builder<SqlNode> operands = ImmutableList.builder();
+ operands.add(name);
+ if (setProps != null) {
+ operands.add(setProps);
+ }
+ if (resetProps != null) {
+ operands.add(resetProps);
+ }
+ return operands.build();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
index de7903897b6..ab644145b4f 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
@@ -159,7 +159,7 @@ public class SqlCreateExternalTable extends SqlCreate
implements BeamSqlParser.E
CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema)
schema;
catalogManagerSchema.maybeRegisterProvider(pathOverride,
SqlDdlNodes.getString(type));
- CatalogSchema catalogSchema = ((CatalogManagerSchema)
schema).getCatalogSchema(pathOverride);
+ CatalogSchema catalogSchema =
catalogManagerSchema.getCatalogSchema(pathOverride);
beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
} else if (schema instanceof BeamCalciteSchema) {
beamCalciteSchema = (BeamCalciteSchema) schema;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
index 6f4d8ee79d9..6d6be5d5a12 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
@@ -19,18 +19,24 @@ package org.apache.beam.sdk.extensions.sql.impl.parser;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlDataTypeSpec;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlLiteral;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.NlsString;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Util;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
/** Utilities concerning {@link SqlNode} for DDL. */
@@ -97,6 +103,41 @@ public class SqlDdlNodes {
return literalValue == null ? null : literalValue.getValue();
}
+ static List<String> getStringList(@Nullable SqlNodeList l) {
+ if (l == null || l.isEmpty()) {
+ return Collections.emptyList();
+ }
+ ImmutableList.Builder<String> resetPropsList = ImmutableList.builder();
+ for (SqlNode propNode : l) {
+ @Nullable String prop = SqlDdlNodes.getString(propNode);
+ if (prop != null) {
+ resetPropsList.add(prop);
+ }
+ }
+ return resetPropsList.build();
+ }
+
+ static Map<String, String> getStringMap(@Nullable SqlNodeList nodeList) {
+ if (nodeList == null || nodeList.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ for (SqlNode property : nodeList) {
+ checkState(
+ property instanceof SqlNodeList,
+ String.format(
+ "Unexpected properties entry '%s' of class '%s'", property,
property.getClass()));
+ SqlNodeList kv = ((SqlNodeList) property);
+ checkState(kv.size() == 2, "Expected 2 items in properties entry, but
got %s", kv.size());
+ String key = checkStateNotNull(SqlDdlNodes.getString(kv.get(0)));
+ String value = checkStateNotNull(SqlDdlNodes.getString(kv.get(1)));
+ builder.put(key, value);
+ }
+
+ return builder.build();
+ }
+
static SqlIdentifier getIdentifier(SqlNode n, SqlParserPos pos) {
if (n instanceof SqlIdentifier) {
return (SqlIdentifier) n;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java
index 4b838c9f418..4d4d0343c73 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java
@@ -84,7 +84,11 @@ public class SqlDropDatabase extends SqlDrop implements
BeamSqlParser.Executable
List<String> components =
Lists.newArrayList(Splitter.on(".").split(databaseName.toString()));
TableName pathOverride = TableName.create(components, "");
- CatalogSchema catalogSchema = ((CatalogManagerSchema)
schema).getCatalogSchema(pathOverride);
+ CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema;
+ CatalogSchema catalogSchema =
+ pathOverride.catalog() != null
+ ? catalogManagerSchema.getCatalogSchema(pathOverride)
+ : catalogManagerSchema.getCurrentCatalogSchema();
catalogSchema.dropDatabase(databaseName, cascade, ifExists);
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
index 0bc5cd91161..5d0cf223406 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
@@ -53,7 +53,11 @@ public class SqlDropTable extends SqlDropObject {
BeamCalciteSchema beamCalciteSchema;
if (schema instanceof CatalogManagerSchema) {
- CatalogSchema catalogSchema = ((CatalogManagerSchema)
schema).getCatalogSchema(pathOverride);
+ CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema)
schema;
+ CatalogSchema catalogSchema =
+ pathOverride.catalog() != null
+ ? catalogManagerSchema.getCatalogSchema(pathOverride)
+ : catalogManagerSchema.getCurrentCatalogSchema();
beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
} else if (schema instanceof BeamCalciteSchema) {
beamCalciteSchema = (BeamCalciteSchema) schema;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
index f0e3fa59ddc..9d06e471dbb 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
@@ -78,7 +78,7 @@ public class SqlUseDatabase extends SqlSetOption implements
BeamSqlParser.Execut
}
CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema;
- CatalogSchema catalogSchema = ((CatalogManagerSchema)
schema).getCatalogSchema(pathOverride);
+ CatalogSchema catalogSchema =
catalogManagerSchema.getCatalogSchema(pathOverride);
// if database exists in a different catalog, we need to also switch to
that catalog
if (pathOverride.catalog() != null
&& !pathOverride
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
index c387a5ace10..cbf1b45c31e 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
@@ -88,6 +88,9 @@ public interface Catalog {
/** User-specified configuration properties. */
Map<String, String> properties();
+ /** Set some catalog properties. If a property already exists, it will be
overridden. */
+ void updateProperties(Map<String, String> setProps, Collection<String>
resetProps);
+
/** Registers this {@link TableProvider} and propagates it to underlying
{@link MetaStore}s. */
void registerTableProvider(TableProvider provider);
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
index 7c0d8b9d32e..cdee6c93022 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
@@ -34,7 +34,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public class InMemoryCatalog implements Catalog {
private final String name;
- private final Map<String, String> properties;
+ protected final Map<String, String> properties;
protected final Map<String, TableProvider> tableProviders = new HashMap<>();
private final Map<String, MetaStore> metaStores = new HashMap<>();
private final HashSet<String> databases = new
HashSet<>(Collections.singleton(DEFAULT));
@@ -77,6 +77,12 @@ public class InMemoryCatalog implements Catalog {
return Preconditions.checkStateNotNull(properties, "InMemoryCatalog has
not been initialized");
}
+ @Override
+ public void updateProperties(Map<String, String> setProps,
Collection<String> resetProps) {
+ properties.putAll(setProps);
+ resetProps.forEach(properties::remove);
+ }
+
@Override
public boolean createDatabase(String database) {
return databases.add(database);
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/AlterTableOps.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/AlterTableOps.java
new file mode 100644
index 00000000000..3f1ea172168
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/AlterTableOps.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * An interface that handles ALTER TABLE operations.
+ *
+ * <p>An instance is created and used when {@link
TableProvider#alterTable(String)} is called.
+ */
+public interface AlterTableOps {
+ /**
+ * Updates a table's properties. Includes setting properties (which
overwrites existing values),
+ * and/or resetting properties (removes values of given keys).
+ */
+ void updateTableProperties(Map<String, String> setProps, List<String>
resetProps);
+
+ /** Updates a table's schema. Includes adding new columns and/or dropping
existing columns. */
+ void updateSchema(List<Schema.Field> columnsToAdd, Collection<String>
columnsToDrop);
+
+ /**
+ * Updates a table's partition spec, if applicable. Includes adding new
partitions and/or dropping
+ * existing partitions.
+ */
+ void updatePartitionSpec(List<String> partitionsToAdd, Collection<String>
partitionsToDrop);
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
index 9be8c96b7c9..595d9cba52f 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
@@ -80,4 +80,9 @@ public interface TableProvider {
default boolean supportsPartitioning(Table table) {
return false;
}
+
+ default AlterTableOps alterTable(String name) {
+ throw new UnsupportedOperationException(
+ String.format("ALTER is not supported for table '%s' of type '%s'.",
name, getTableType()));
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java
new file mode 100644
index 00000000000..0ab17d3df89
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.test;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.TableWithRows;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+
+public class AlterTestTableOps implements AlterTableOps {
+ private final TableWithRows tableWithRows;
+
+ AlterTestTableOps(TableWithRows tableWithRows) {
+ this.tableWithRows = tableWithRows;
+ }
+
+ @Override
+ public void updateTableProperties(Map<String, String> setProps, List<String>
resetProps) {
+ ObjectNode props = tableWithRows.getTable().getProperties();
+ resetProps.forEach(props::remove);
+ setProps.forEach(props::put);
+
tableWithRows.setTable(tableWithRows.getTable().toBuilder().properties(props).build());
+ }
+
+ @Override
+ public void updateSchema(List<Field> columnsToAdd, Collection<String>
columnsToDrop) {
+ if (!columnsToAdd.isEmpty() && !tableWithRows.getRows().isEmpty()) {
+ ImmutableList.Builder<String> requiredFields = ImmutableList.builder();
+ for (Field f : columnsToAdd) {
+ if (!f.getType().getNullable()) {
+ requiredFields.add(f.getName());
+ }
+ }
+ Preconditions.checkArgument(
+ requiredFields.build().isEmpty(),
+ "Cannot add required fields %s because table '%s' already contains
rows.",
+ requiredFields.build(),
+ tableWithRows.getTable().getName());
+ }
+
+ // update the schema
+ List<Field> schemaFields =
tableWithRows.getTable().getSchema().getFields();
+ ImmutableList.Builder<Field> newSchemaFields = ImmutableList.builder();
+ // remove dropped fields
+ schemaFields.stream()
+ .filter(f -> !columnsToDrop.contains(f.getName()))
+ .forEach(newSchemaFields::add);
+ // add new fields
+ newSchemaFields.addAll(columnsToAdd);
+ Schema newSchema = Schema.of(newSchemaFields.build().toArray(new
Field[0]));
+
tableWithRows.setTable(tableWithRows.getTable().toBuilder().schema(newSchema).build());
+
+ // update existing rows
+ List<Row> rows = tableWithRows.getRows();
+ ImmutableList.Builder<Row> newRows =
ImmutableList.builderWithExpectedSize(rows.size());
+ for (Row row : rows) {
+ Map<String, Object> values = new HashMap<>();
+ // add existing values, minus dropped columns
+ for (Field field : schemaFields) {
+ String name = field.getName();
+ if (!columnsToDrop.contains(name)) {
+ values.put(name, row.getValue(name));
+ }
+ }
+ Row newRow = Row.withSchema(newSchema).withFieldValues(values).build();
+ newRows.add(newRow);
+ }
+ tableWithRows.setRows(new CopyOnWriteArrayList<Row>(newRows.build()));
+ }
+
+ @Override
+ public void updatePartitionSpec(
+ List<String> partitionsToAdd, Collection<String> partitionsToDrop) {
+ throw new UnsupportedOperationException(
+ TestTableProvider.class.getSimpleName() + " does not support
partitions.");
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index 375cb42c490..365ed9fbc2c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -36,6 +36,7 @@ import
org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
import
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -50,6 +51,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -107,6 +109,13 @@ public class TestTableProvider extends
InMemoryMetaTableProvider {
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().table));
}
+ @Override
+ public AlterTableOps alterTable(String name) {
+ TableWithRows table =
+ Preconditions.checkArgumentNotNull(tables().get(name), "Could not find
table '%s'", name);
+ return new AlterTestTableOps(table);
+ }
+
@Override
public synchronized BeamSqlTable buildBeamSqlTable(Table table) {
return new InMemoryTable(tables().get(table.getName()));
@@ -133,9 +142,21 @@ public class TestTableProvider extends
InMemoryMetaTableProvider {
this.rows = new CopyOnWriteArrayList<>();
}
+ public Table getTable() {
+ return table;
+ }
+
+ void setTable(Table table) {
+ this.table = table;
+ }
+
public List<Row> getRows() {
return rows;
}
+
+ void setRows(List<Row> rows) {
+ this.rows = rows;
+ }
}
private static class InMemoryTable extends BaseBeamTable {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
index 83b8685c3fe..8892cd889fd 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -150,4 +151,14 @@ public class InMemoryMetaStore implements MetaStore {
throw new IllegalStateException("No TableProvider registered for table
type: " + type);
}
+
+ @Override
+ public AlterTableOps alterTable(String name) {
+ if (!tables.containsKey(name)) {
+ throw new IllegalArgumentException("No such table: " + name);
+ }
+
+ Table table = tables.get(name);
+ return getProvider(table.getType()).alterTable(name);
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java
index 0164c634814..1fc9ebcbe18 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterCatalog;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
@@ -35,6 +36,13 @@ import
org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.runtime.CalciteContextException;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlLiteral;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.dialect.AnsiSqlDialect;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.pretty.SqlPrettyWriter;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Rule;
@@ -330,4 +338,89 @@ public class BeamSqlCliCatalogTest {
cli.execute("DROP TABLE catalog_1.db_1.person");
assertNull(metastoreDb1.getTable("person"));
}
+
+ @Test
+ public void testAlterCatalog() {
+ cli.execute("CREATE CATALOG my_catalog TYPE 'local'
PROPERTIES('foo'='abc', 'bar'='xyz')");
+ cli.execute("USE CATALOG my_catalog");
+ assertEquals(
+ ImmutableMap.of("foo", "abc", "bar", "xyz"),
catalogManager.currentCatalog().properties());
+ cli.execute("ALTER CATALOG my_catalog SET ('foo'='123', 'new'='val')");
+ assertEquals(
+ ImmutableMap.of("foo", "123", "bar", "xyz", "new", "val"),
+ catalogManager.currentCatalog().properties());
+ cli.execute("ALTER CATALOG my_catalog RESET ('foo', 'bar')");
+ assertEquals(ImmutableMap.of("new", "val"),
catalogManager.currentCatalog().properties());
+ }
+
+ @Test
+ public void testUnparse_SetProperties() {
+ SqlNode catalogName = new SqlIdentifier("test_catalog", POS);
+
+ SqlNodeList setProps = new SqlNodeList(POS);
+ setProps.add(createPropertyPair("k1", "v1"));
+ setProps.add(createPropertyPair("k2", "v2"));
+
+ SqlAlterCatalog alterCatalog = new SqlAlterCatalog(POS, null, catalogName,
setProps, null);
+
+ String expectedSql = "ALTER CATALOG `test_catalog` SET ('k1' = 'v1', 'k2'
= 'v2')";
+ assertEquals(expectedSql, toSql(alterCatalog));
+ }
+
+ @Test
+ public void testUnparse_ResetProperties() {
+ SqlNode catalogName = new SqlIdentifier("test_catalog", POS);
+
+ SqlNodeList resetProps = new SqlNodeList(POS);
+ resetProps.add(SqlLiteral.createCharString("k1", POS));
+ resetProps.add(SqlLiteral.createCharString("k2", POS));
+
+ SqlAlterCatalog alterCatalog = new SqlAlterCatalog(POS, null, catalogName,
null, resetProps);
+
+ String expectedSql = "ALTER CATALOG `test_catalog` RESET ('k1', 'k2')";
+ assertEquals(expectedSql, toSql(alterCatalog));
+ }
+
+ @Test
+ public void testUnparse_SetAndResetProperties() {
+ SqlNode catalogName = new SqlIdentifier("my_cat", POS);
+
+ SqlNodeList setProps = new SqlNodeList(POS);
+ setProps.add(createPropertyPair("k1", "v1"));
+
+ SqlNodeList resetProps = new SqlNodeList(POS);
+ resetProps.add(SqlLiteral.createCharString("k2", POS));
+
+ SqlAlterCatalog alterCatalog =
+ new SqlAlterCatalog(POS, null, catalogName, setProps, resetProps);
+
+ String expectedSql = "ALTER CATALOG `my_cat` SET ('k1' = 'v1') RESET
('k2')";
+ assertEquals(expectedSql, toSql(alterCatalog));
+ }
+
+ private static final SqlParserPos POS = SqlParserPos.ZERO;
+
+ /**
+ * Helper to execute the unparse mechanism using a PrettyWriter. This
triggers SqlAlter.unparse ->
+ * SqlAlterCatalog.unparseAlterOperation.
+ */
+ private String toSql(SqlAlterCatalog node) {
+ SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
+ writer.setAlwaysUseParentheses(false);
+ writer.setSelectListItemsOnSeparateLines(false);
+ writer.setIndentation(0);
+ node.unparse(writer, 0, 0);
+ return writer.toSqlString().getSql();
+ }
+
+ /**
+ * Helper to create the structure expected by SqlAlterCatalog for K=V pairs.
Expects a SqlNodeList
+ * containing two literals [Key, Value].
+ */
+ private SqlNodeList createPropertyPair(String key, String value) {
+ SqlNodeList pair = new SqlNodeList(POS);
+ pair.add(SqlLiteral.createCharString(key, POS));
+ pair.add(SqlLiteral.createCharString(value, POS));
+ return pair;
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index ffbdeb84f13..522d9c35bbf 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -24,21 +24,37 @@ import static
org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimest
import static org.apache.beam.sdk.schemas.Schema.toSchema;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.oneOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.time.LocalDate;
import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
+import org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlLiteral;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.dialect.AnsiSqlDialect;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -300,4 +316,237 @@ public class BeamSqlCliTest {
// test TIMESTAMP field
assertEquals(parseTimestampWithUTCTimeZone("2018-07-01 21:26:07.123"),
row.getDateTime("f_ts"));
}
+
+ @Test
+ public void testAlterTableSchema() {
+ InMemoryCatalogManager catalogManager = new InMemoryCatalogManager();
+ TestTableProvider provider = new TestTableProvider();
+ catalogManager.registerTableProvider(provider);
+ BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager);
+
+ cli.execute(
+ "CREATE EXTERNAL TABLE test_table(id integer not null, str varchar not
null, fl float) type 'test'");
+ cli.execute("INSERT INTO test_table VALUES (1, 'a', 0.1), (2, 'b', 0.2),
(3, 'c', 0.3)");
+ TestTableProvider.TableWithRows tableWithRows =
provider.tables().get("test_table");
+ assertNotNull(tableWithRows);
+ Schema initialSchema =
+ Schema.builder()
+ .addInt32Field("id")
+ .addStringField("str")
+ .addNullableFloatField("fl")
+ .build();
+ assertEquals(initialSchema, tableWithRows.getTable().getSchema());
+ List<Row> initialRows =
+ Arrays.asList(
+ Row.withSchema(initialSchema).addValues(1, "a", 0.1f).build(),
+ Row.withSchema(initialSchema).addValues(2, "b", 0.2f).build(),
+ Row.withSchema(initialSchema).addValues(3, "c", 0.3f).build());
+ assertThat(initialRows,
everyItem(is(oneOf(tableWithRows.getRows().toArray(new Row[0])))));
+
+ cli.execute(
+ "ALTER TABLE test_table DROP COLUMNS (str, fl) ADD COLUMNS (newBool
boolean, newLong bigint)");
+ cli.execute("INSERT INTO test_table VALUES (4, true, 4), (5, false, 5),
(6, false, 6)");
+ Schema newSchema =
+ Schema.builder()
+ .addInt32Field("id")
+ .addNullableBooleanField("newBool")
+ .addNullableInt64Field("newLong")
+ .build();
+ assertEquals(newSchema, tableWithRows.getTable().getSchema());
+
+ // existing rows should have the corresponding values dropped
+ List<Row> newRows =
+ Arrays.asList(
+ Row.withSchema(newSchema).addValues(1, null, null).build(),
+ Row.withSchema(newSchema).addValues(2, null, null).build(),
+ Row.withSchema(newSchema).addValues(3, null, null).build(),
+ Row.withSchema(newSchema).addValues(4, true, 4L).build(),
+ Row.withSchema(newSchema).addValues(5, false, 5L).build(),
+ Row.withSchema(newSchema).addValues(6, false, 6L).build());
+ assertThat(newRows, everyItem(is(oneOf(tableWithRows.getRows().toArray(new
Row[0])))));
+ }
+
+ @Test
+ public void testAlterTableProperties() {
+ InMemoryCatalogManager catalogManager = new InMemoryCatalogManager();
+ TestTableProvider provider = new TestTableProvider();
+ catalogManager.registerTableProvider(provider);
+ BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager);
+
+ cli.execute(
+ "CREATE EXTERNAL TABLE test_table(id integer, str varchar) type 'test'
"
+ + "TBLPROPERTIES '{ \"foo\" : \"123\", \"bar\" : \"abc\"}'");
+ TestTableProvider.TableWithRows tableWithRows =
provider.tables().get("test_table");
+ assertNotNull(tableWithRows);
+ assertEquals("123",
tableWithRows.getTable().getProperties().get("foo").asText());
+ assertEquals("abc",
tableWithRows.getTable().getProperties().get("bar").asText());
+
+ cli.execute("ALTER TABLE test_table RESET('bar') SET('foo'='456',
'baz'='xyz')");
+ assertEquals("456",
tableWithRows.getTable().getProperties().get("foo").asText());
+ assertEquals("xyz",
tableWithRows.getTable().getProperties().get("baz").asText());
+ assertFalse(tableWithRows.getTable().getProperties().has("bar"));
+ }
+
+ private static final SqlParserPos POS = SqlParserPos.ZERO;
+
+ @Test
+ public void testUnparseAlter_AddColumns() {
+ SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+ Field col1 = Field.of("new_col_1",
Schema.FieldType.STRING).withNullable(true);
+ Field col2 =
+ Field.of("new_col_2", Schema.FieldType.INT64)
+ .withNullable(false)
+ .withDescription("description for col2");
+
+ List<Field> columnsToAdd = Arrays.asList(col1, col2);
+
+ SqlAlterTable alterTable =
+ new SqlAlterTable(POS, null, tableName, columnsToAdd, null, null,
null, null, null);
+
+ String expectedSql =
+ "ALTER TABLE `test_table` "
+ + "ADD COLUMNS (`new_col_1` VARCHAR, "
+ + "`new_col_2` BIGINT NOT NULL COMMENT `description for col2`)";
+
+ assertEquals(expectedSql, toSql(alterTable));
+ }
+
+ @Test
+ public void testUnparseAlter_DropColumns() {
+ SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+ SqlNodeList columnsToDrop = createStringList("col_to_drop_1",
"col_to_drop_2");
+
+ SqlAlterTable alterTable =
+ new SqlAlterTable(POS, null, tableName, null, columnsToDrop, null,
null, null, null);
+
+ String expectedSql = "ALTER TABLE `test_table` DROP COLUMNS
(`col_to_drop_1`, `col_to_drop_2`)";
+ assertEquals(expectedSql, toSql(alterTable));
+ }
+
+ @Test
+ public void testUnparseAlter_AddColumnsAndDropColumns() {
+ SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+ // Setup Add
+ Field col1 = Field.of("new_col",
Schema.FieldType.BOOLEAN).withNullable(true);
+ List<Field> columnsToAdd = Arrays.asList(col1);
+
+ // Setup Drop
+ SqlNodeList columnsToDrop = createStringList("col_to_drop");
+
+ SqlAlterTable alterTable =
+ new SqlAlterTable(
+ POS, null, tableName, columnsToAdd, columnsToDrop, null, null,
null, null);
+
+ // unparses DROP before ADD
+ String expectedSql =
+ "ALTER TABLE `test_table` "
+ + "DROP COLUMNS (`col_to_drop`) "
+ + "ADD COLUMNS (`new_col` BOOLEAN)";
+
+ assertEquals(expectedSql, toSql(alterTable));
+ }
+
+ @Test
+ public void testUnparseAlter_AddAndDropPartitions() {
+ SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+ SqlNodeList partsToAdd = createStringList("p1", "p2");
+ SqlNodeList partsToDrop = createStringList("p3");
+
+ SqlAlterTable alterTable =
+ new SqlAlterTable(POS, null, tableName, null, null, partsToAdd,
partsToDrop, null, null);
+
+ // unparses DROP before ADD
+ String expectedSql =
+ "ALTER TABLE `test_table` DROP PARTITIONS (`p3`) ADD PARTITIONS (`p1`,
`p2`)";
+
+ assertEquals(expectedSql, toSql(alterTable));
+ }
+
+ @Test
+ public void testUnparseAlter_TableProperties() {
+ SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+ SqlNodeList setProps = new SqlNodeList(POS);
+ setProps.add(createPropertyPair("prop1", "val1"));
+
+ SqlNodeList resetProps = createStringList("prop2");
+
+ SqlAlterTable alterTable =
+ new SqlAlterTable(POS, null, tableName, null, null, null, null,
setProps, resetProps);
+
+ // unparses RESET before SET
+ String expectedSql = "ALTER TABLE `test_table` RESET ('prop2') SET
('prop1' = 'val1')";
+
+ assertEquals(expectedSql, toSql(alterTable));
+ }
+
+ @Test
+ public void testUnparseAlter_AllOperations() {
+ // A comprehensive test combining all clauses to verify strict ordering
+ SqlNode tableName = new SqlIdentifier("full_table", POS);
+
+ List<Field> addCols = Collections.singletonList(Field.of("c1",
Schema.FieldType.BOOLEAN));
+ SqlNodeList dropCols = createStringList("c_old");
+ SqlNodeList addParts = createStringList("p_new");
+ SqlNodeList dropParts = createStringList("p_old");
+ SqlNodeList setProps = new SqlNodeList(POS);
+ setProps.add(createPropertyPair("k", "v"));
+ SqlNodeList resetProps = createStringList("k_reset");
+
+ SqlAlterTable alterTable =
+ new SqlAlterTable(
+ POS, null, tableName, addCols, dropCols, addParts, dropParts,
setProps, resetProps);
+
+ // Expected Order based on source code:
+ // 1. DROP COLUMNS
+ // 2. ADD COLUMNS
+ // 3. DROP PARTITIONS
+ // 4. ADD PARTITIONS
+ // 5. RESET
+ // 6. SET
+ String expectedSql =
+ "ALTER TABLE `full_table` "
+ + "DROP COLUMNS (`c_old`) "
+ + "ADD COLUMNS (`c1` BOOLEAN NOT NULL) "
+ + "DROP PARTITIONS (`p_old`) "
+ + "ADD PARTITIONS (`p_new`) "
+ + "RESET ('k_reset') "
+ + "SET ('k' = 'v')";
+
+ assertEquals(expectedSql, toSql(alterTable));
+ }
+
+ /** Helper to execute the unparse mechanism using a PrettyWriter. */
+ private String toSql(SqlAlterTable node) {
+ SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
+ writer.setAlwaysUseParentheses(false);
+ writer.setSelectListItemsOnSeparateLines(false);
+ writer.setIndentation(0);
+ node.unparse(writer, 0, 0);
+ return writer.toSqlString().getSql();
+ }
+
+ /**
+ * Helper to create a list of string literals. Useful for Drop Columns,
Add/Drop Partitions, Reset
+ * Props.
+ */
+ private SqlNodeList createStringList(String... values) {
+ SqlNodeList list = new SqlNodeList(POS);
+ for (String val : values) {
+ list.add(SqlLiteral.createCharString(val, POS));
+ }
+ return list;
+ }
+
+ /** Helper to create Key=Value pair for Set Properties. */
+ private SqlNodeList createPropertyPair(String key, String value) {
+ SqlNodeList pair = new SqlNodeList(POS);
+ pair.add(SqlLiteral.createCharString(key, POS));
+ pair.add(SqlLiteral.createCharString(value, POS));
+ return pair;
+ }
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
index 7603e2c6259..748dd319c07 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.iceberg;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,18 +28,23 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.ReleaseInfo;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.UpdatePartitionSpec;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.types.Type;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
@@ -143,7 +149,10 @@ public abstract class IcebergCatalogConfig implements
Serializable {
}
public void createTable(
- String tableIdentifier, Schema tableSchema, @Nullable List<String>
partitionFields) {
+ String tableIdentifier,
+ Schema tableSchema,
+ @Nullable List<String> partitionFields,
+ Map<String, String> properties) {
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
org.apache.iceberg.Schema icebergSchema =
IcebergUtils.beamSchemaToIcebergSchema(tableSchema);
PartitionSpec icebergSpec =
PartitionUtils.toPartitionSpec(partitionFields, tableSchema);
@@ -153,7 +162,7 @@ public abstract class IcebergCatalogConfig implements
Serializable {
icebergIdentifier,
icebergSchema,
icebergSpec);
- catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec);
+ catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec,
properties);
LOG.info("Successfully created table '{}'.", icebergIdentifier);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(e);
@@ -164,28 +173,92 @@ public abstract class IcebergCatalogConfig implements
Serializable {
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
try {
Table table = catalog().loadTable(icebergIdentifier);
- return IcebergTableInfo.create(
- tableIdentifier,
- IcebergUtils.icebergSchemaToBeamSchema(table.schema()),
- table.properties());
+ return new IcebergTableInfo(tableIdentifier, table);
} catch (NoSuchTableException ignored) {
return null;
}
}
// Helper class to pass information to Beam SQL module without relying on
Iceberg deps
- @AutoValue
- public abstract static class IcebergTableInfo {
- public abstract String getIdentifier();
+ public static class IcebergTableInfo {
+ private final String identifier;
+ private final Table table;
- public abstract Schema getSchema();
+ IcebergTableInfo(String identifier, Table table) {
+ this.identifier = identifier;
+ this.table = table;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public Schema getSchema() {
+ return IcebergUtils.icebergSchemaToBeamSchema(table.schema());
+ }
+
+ public Map<String, String> getProperties() {
+ return table.properties();
+ }
+
+ public void updateTableProps(Map<String, String> setProps, List<String>
resetProps) {
+ if (setProps.isEmpty() && resetProps.isEmpty()) {
+ return;
+ }
+
+ UpdateProperties update = table.updateProperties();
+ resetProps.forEach(update::remove);
+ setProps.forEach(update::set);
+
+ update.commit();
+ }
- public abstract Map<String, String> getProperties();
+ public void updateSchema(List<Schema.Field> columnsToAdd,
Collection<String> columnsToDrop) {
+ if (columnsToAdd.isEmpty() && columnsToDrop.isEmpty()) {
+ return;
+ }
+ UpdateSchema update = table.updateSchema();
+ ImmutableList.Builder<String> requiredColumns = ImmutableList.builder();
+
+ for (Schema.Field col : columnsToAdd) {
+ String name = col.getName();
+ Type type =
IcebergUtils.beamFieldTypeToIcebergFieldType(col.getType(), 0).type;
+ String desc = col.getDescription();
+
+ if (col.getType().getNullable()) {
+ if (desc.isEmpty()) {
+ update.addColumn(name, type);
+ } else {
+ update.addColumn(name, type, desc);
+ }
+ } else {
+ requiredColumns.add(name);
+ }
+ }
+ if (!requiredColumns.build().isEmpty()) {
+ throw new UnsupportedOperationException(
+ "Adding required columns is not yet supported. "
+ + "Encountered required columns: "
+ + requiredColumns.build());
+ }
+
+ columnsToDrop.forEach(update::deleteColumn);
- static IcebergTableInfo create(
- String identifier, Schema schema, Map<String, String> properties) {
- return new AutoValue_IcebergCatalogConfig_IcebergTableInfo(identifier,
schema, properties);
- };
+ update.commit();
+ }
+
+ public void updatePartitionSpec(
+ List<String> partitionsToAdd, Collection<String> partitionsToDrop) {
+ if (partitionsToAdd.isEmpty() && partitionsToDrop.isEmpty()) {
+ return;
+ }
+ UpdatePartitionSpec update = table.updateSpec();
+
+
partitionsToDrop.stream().map(PartitionUtils::toIcebergTerm).forEach(update::removeField);
+
partitionsToAdd.stream().map(PartitionUtils::toIcebergTerm).forEach(update::addField);
+
+ update.commit();
+ }
}
public boolean dropTable(String tableIdentifier) {
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
index 4b94663c64c..2b3117f8bf8 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
@@ -22,11 +22,14 @@ import static
org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Term;
import org.checkerframework.checker.nullness.qual.Nullable;
class PartitionUtils {
@@ -90,4 +93,38 @@ class PartitionUtils {
return builder.build();
}
+
+ private static final Map<Pattern, Function<Matcher, Term>> TERMS =
+ ImmutableMap.of(
+ HOUR,
+ matcher -> Expressions.hour(checkStateNotNull(matcher.group(1))),
+ DAY,
+ matcher -> Expressions.day(checkStateNotNull(matcher.group(1))),
+ MONTH,
+ matcher -> Expressions.month(checkStateNotNull(matcher.group(1))),
+ YEAR,
+ matcher -> Expressions.year(checkStateNotNull(matcher.group(1))),
+ TRUNCATE,
+ matcher ->
+ Expressions.truncate(
+ checkStateNotNull(matcher.group(1)),
+ Integer.parseInt(checkStateNotNull(matcher.group(2)))),
+ BUCKET,
+ matcher ->
+ Expressions.bucket(
+ checkStateNotNull(matcher.group(1)),
+ Integer.parseInt(checkStateNotNull(matcher.group(2)))),
+ IDENTITY,
+ matcher -> Expressions.ref(checkStateNotNull(matcher.group(1))));
+
+ static Term toIcebergTerm(String field) {
+ for (Map.Entry<Pattern, Function<Matcher, Term>> entry : TERMS.entrySet())
{
+ Matcher matcher = entry.getKey().matcher(field);
+ if (matcher.find()) {
+ return entry.getValue().apply(matcher);
+ }
+ }
+
+ throw new IllegalArgumentException("Could not find a partition term for '"
+ field + "'.");
+ }
}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
index 893e24b6155..591467ce0d0 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
@@ -823,14 +823,8 @@ public class FilterUtilsTest {
ImmutableSet<Operation> inOperations = ImmutableSet.of(Operation.IN,
Operation.NOT_IN);
if (inOperations.contains(expected.op())) {
- System.out.printf(
- "xxx op: %s, literals: %s, ref: %s%n",
- expected.op(), expected.literals(), expected.ref().name());
assertEquals(expected.literals(), actual.literals());
} else {
- System.out.printf(
- "xxx op: %s, literal: %s, ref: %s%n",
- expected.op(), expected.literal(), expected.ref().name());
assertEquals(expected.literal(), actual.literal());
}
}