This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f96db3733c2 [SQL] Alter Catalog and Table statements (#36571)
f96db3733c2 is described below

commit f96db3733c2486b8cbe4a8f3fe479cdef5e75cff
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Feb 9 12:01:37 2026 -0500

    [SQL] Alter Catalog and Table statements (#36571)
    
    * alter catalog
    
    * alter table
    
    * spotless
    
    * spotless
    
    * fix unparse method, cleanup
    
    * add more tests
    
    * add deps
    
    * address comments: add java docs; add unit tests for unparsing; fix nits
    
    * spotless
---
 .github/trigger_files/beam_PostCommit_SQL.json     |   2 +-
 .github/trigger_files/beam_PreCommit_SQL.json      |   2 +-
 sdks/java/extensions/sql/iceberg/build.gradle      |   2 +
 .../provider/iceberg/IcebergAlterTableOps.java     |  50 ++++
 .../sql/meta/provider/iceberg/IcebergCatalog.java  |  50 ++--
 .../meta/provider/iceberg/IcebergMetastore.java    |  16 +-
 .../iceberg/BeamSqlCliIcebergAlterTest.java        | 231 ++++++++++++++++++
 .../extensions/sql/src/main/codegen/config.fmpp    |   7 +
 .../sql/src/main/codegen/includes/parserImpls.ftl  | 120 +++++++++-
 .../sdk/extensions/sql/impl/CatalogSchema.java     |   4 +
 .../sql/impl/parser/SqlAlterCatalog.java           | 164 +++++++++++++
 .../extensions/sql/impl/parser/SqlAlterTable.java  | 259 +++++++++++++++++++++
 .../sql/impl/parser/SqlCreateExternalTable.java    |   2 +-
 .../extensions/sql/impl/parser/SqlDdlNodes.java    |  41 ++++
 .../sql/impl/parser/SqlDropDatabase.java           |   6 +-
 .../extensions/sql/impl/parser/SqlDropTable.java   |   6 +-
 .../extensions/sql/impl/parser/SqlUseDatabase.java |   2 +-
 .../sdk/extensions/sql/meta/catalog/Catalog.java   |   3 +
 .../sql/meta/catalog/InMemoryCatalog.java          |   8 +-
 .../sql/meta/provider/AlterTableOps.java           |  45 ++++
 .../sql/meta/provider/TableProvider.java           |   5 +
 .../sql/meta/provider/test/AlterTestTableOps.java  | 101 ++++++++
 .../sql/meta/provider/test/TestTableProvider.java  |  21 ++
 .../sql/meta/store/InMemoryMetaStore.java          |  11 +
 .../sdk/extensions/sql/BeamSqlCliCatalogTest.java  |  93 ++++++++
 .../beam/sdk/extensions/sql/BeamSqlCliTest.java    | 249 ++++++++++++++++++++
 .../beam/sdk/io/iceberg/IcebergCatalogConfig.java  | 103 ++++++--
 .../apache/beam/sdk/io/iceberg/PartitionUtils.java |  37 +++
 .../beam/sdk/io/iceberg/FilterUtilsTest.java       |   6 -
 29 files changed, 1595 insertions(+), 51 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_SQL.json 
b/.github/trigger_files/beam_PostCommit_SQL.json
index 5df3841d236..e584718ac8f 100644
--- a/.github/trigger_files/beam_PostCommit_SQL.json
+++ b/.github/trigger_files/beam_PostCommit_SQL.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run ",
-  "modification": 3
+  "modification": 0
 }
diff --git a/.github/trigger_files/beam_PreCommit_SQL.json 
b/.github/trigger_files/beam_PreCommit_SQL.json
index ab4daeae234..07d1fb88996 100644
--- a/.github/trigger_files/beam_PreCommit_SQL.json
+++ b/.github/trigger_files/beam_PreCommit_SQL.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "modification": 3
+  "modification": 0
 }
diff --git a/sdks/java/extensions/sql/iceberg/build.gradle 
b/sdks/java/extensions/sql/iceberg/build.gradle
index d5f9e74c53b..1e319c97a8e 100644
--- a/sdks/java/extensions/sql/iceberg/build.gradle
+++ b/sdks/java/extensions/sql/iceberg/build.gradle
@@ -31,6 +31,8 @@ dependencies {
   implementation project(":sdks:java:core")
   implementation project(":sdks:java:managed")
   implementation project(":sdks:java:io:iceberg")
+  implementation library.java.jackson_databind
+  implementation library.java.jackson_core
   runtimeOnly project(":sdks:java:io:iceberg:bqms")
   runtimeOnly project(":sdks:java:io:iceberg:hive")
   // TODO(https://github.com/apache/beam/issues/21156): Determine how to build 
without this dependency
diff --git 
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergAlterTableOps.java
 
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergAlterTableOps.java
new file mode 100644
index 00000000000..0c8a5519ee4
--- /dev/null
+++ 
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergAlterTableOps.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
+import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig.IcebergTableInfo;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** {@link AlterTableOps} for Iceberg tables. */
+public class IcebergAlterTableOps implements AlterTableOps {
+  private final IcebergTableInfo table;
+
+  IcebergAlterTableOps(IcebergTableInfo table) {
+    this.table = table;
+  }
+
+  @Override
+  public void updateTableProperties(Map<String, String> setProps, List<String> 
resetProps) {
+    table.updateTableProps(setProps, resetProps);
+  }
+
+  @Override
+  public void updateSchema(List<Schema.Field> columnsToAdd, Collection<String> 
columnsToDrop) {
+    table.updateSchema(columnsToAdd, columnsToDrop);
+  }
+
+  @Override
+  public void updatePartitionSpec(
+      List<String> partitionsToAdd, Collection<String> partitionsToDrop) {
+    table.updatePartitionSpec(partitionsToAdd, partitionsToDrop);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
 
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
index 7dee72511e8..6330f550a2b 100644
--- 
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
+++ 
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
@@ -32,28 +32,11 @@ public class IcebergCatalog extends InMemoryCatalog {
   //  other SDKs can make use of it too
   private static final String BEAM_HADOOP_PREFIX = "beam.catalog.hadoop";
   private final Map<String, IcebergMetastore> metaStores = new HashMap<>();
-  @VisibleForTesting final IcebergCatalogConfig catalogConfig;
+  @VisibleForTesting IcebergCatalogConfig catalogConfig;
 
-  public IcebergCatalog(String name, Map<String, String> properties) {
-    super(name, properties);
-
-    ImmutableMap.Builder<String, String> catalogProps = ImmutableMap.builder();
-    ImmutableMap.Builder<String, String> hadoopProps = ImmutableMap.builder();
-
-    for (Map.Entry<String, String> entry : properties.entrySet()) {
-      if (entry.getKey().startsWith(BEAM_HADOOP_PREFIX)) {
-        hadoopProps.put(entry.getKey(), entry.getValue());
-      } else {
-        catalogProps.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    catalogConfig =
-        IcebergCatalogConfig.builder()
-            .setCatalogName(name)
-            .setCatalogProperties(catalogProps.build())
-            .setConfigProperties(hadoopProps.build())
-            .build();
+  public IcebergCatalog(String name, Map<String, String> props) {
+    super(name, props);
+    catalogConfig = initConfig(name, props);
   }
 
   @Override
@@ -67,6 +50,12 @@ public class IcebergCatalog extends InMemoryCatalog {
     return "iceberg";
   }
 
+  @Override
+  public void updateProperties(Map<String, String> setProps, 
Collection<String> resetProps) {
+    super.updateProperties(setProps, resetProps);
+    catalogConfig = initConfig(name(), properties());
+  }
+
   @Override
   public boolean createDatabase(String database) {
     return catalogConfig.createNamespace(database);
@@ -97,4 +86,23 @@ public class IcebergCatalog extends InMemoryCatalog {
     }
     return removed;
   }
+
+  private static IcebergCatalogConfig initConfig(String name, Map<String, 
String> properties) {
+    ImmutableMap.Builder<String, String> catalogProps = ImmutableMap.builder();
+    ImmutableMap.Builder<String, String> hadoopProps = ImmutableMap.builder();
+
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      if (entry.getKey().startsWith(BEAM_HADOOP_PREFIX)) {
+        hadoopProps.put(entry.getKey(), entry.getValue());
+      } else {
+        catalogProps.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return IcebergCatalogConfig.builder()
+        .setCatalogName(name)
+        .setCatalogProperties(catalogProps.build())
+        .setConfigProperties(hadoopProps.build())
+        .build();
+  }
 }
diff --git 
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java
 
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java
index b73aa25c7a2..678c76153c5 100644
--- 
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java
+++ 
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java
@@ -20,6 +20,7 @@ package 
org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.TableUtils;
@@ -60,7 +61,11 @@ public class IcebergMetastore extends InMemoryMetaStore {
     } else {
       String identifier = getIdentifier(table);
       try {
-        catalogConfig.createTable(identifier, table.getSchema(), 
table.getPartitionFields());
+        Map<String, String> properties =
+            TableUtils.getObjectMapper()
+                .convertValue(table.getProperties(), new 
TypeReference<Map<String, String>>() {});
+        catalogConfig.createTable(
+            identifier, table.getSchema(), table.getPartitionFields(), 
properties);
       } catch (TableAlreadyExistsException e) {
         LOG.info(
             "Iceberg table '{}' already exists at location '{}'.", 
table.getName(), identifier);
@@ -147,6 +152,15 @@ public class IcebergMetastore extends InMemoryMetaStore {
     return getProvider(table.getType()).supportsPartitioning(table);
   }
 
+  @Override
+  public IcebergAlterTableOps alterTable(String name) {
+    IcebergTableInfo table =
+        checkStateNotNull(
+            catalogConfig.loadTable(getIdentifier(name)), "Could not find 
table '%s'", name);
+
+    return new IcebergAlterTableOps(table);
+  }
+
   @Override
   public void registerProvider(TableProvider provider) {
     super.registerProvider(provider);
diff --git 
a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java
 
b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java
new file mode 100644
index 00000000000..b6fd2b14a5e
--- /dev/null
+++ 
b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import static java.lang.String.format;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+/** Unit tests specifically for ALTERing Iceberg catalogs and tables. */
+public class BeamSqlCliIcebergAlterTest {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+  private InMemoryCatalogManager catalogManager;
+  private BeamSqlCli cli;
+  private String warehouse;
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Before
+  public void setup() throws IOException {
+    catalogManager = new InMemoryCatalogManager();
+    cli = new BeamSqlCli().catalogManager(catalogManager);
+    File warehouseFile = TEMPORARY_FOLDER.newFolder();
+    assertTrue(warehouseFile.delete());
+    warehouse = "file:" + warehouseFile + "/" + UUID.randomUUID();
+  }
+
+  private String createCatalog(String name) {
+    return format("CREATE CATALOG %s \n", name)
+        + "TYPE iceberg \n"
+        + "PROPERTIES (\n"
+        + "  'type' = 'hadoop', \n"
+        + format("  'warehouse' = '%s')", warehouse);
+  }
+
+  @Test
+  public void testAlterCatalog() {
+    cli.execute(createCatalog("my_catalog"));
+    IcebergCatalog catalog =
+        (IcebergCatalog) 
checkStateNotNull(catalogManager.getCatalog("my_catalog"));
+    Map<String, String> expected = ImmutableMap.of("type", "hadoop", 
"warehouse", warehouse);
+    assertEquals(expected, catalog.properties());
+    assertEquals(expected, catalog.catalogConfig.getCatalogProperties());
+
+    cli.execute("ALTER CATALOG my_catalog SET ('abc'='123', 'foo'='bar') RESET 
('type')");
+    expected = ImmutableMap.of("warehouse", warehouse, "abc", "123", "foo", 
"bar");
+    assertEquals(expected, catalog.properties());
+    assertEquals(expected, catalog.catalogConfig.getCatalogProperties());
+  }
+
+  @Test
+  public void testAlterTableProps() {
+    cli.execute(createCatalog("my_catalog"));
+    cli.execute("CREATE DATABASE my_catalog.my_db");
+    cli.execute("USE DATABASE my_catalog.my_db");
+    cli.execute(
+        "CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER) TYPE 
'iceberg' TBLPROPERTIES '{ \"prop1\" : \"123\", \"prop2\" : \"abc\"}'");
+    IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog();
+    Table table =
+        
catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table"));
+    assertThat(table.properties(), allOf(hasEntry("prop1", "123"), 
hasEntry("prop2", "abc")));
+
+    cli.execute("ALTER TABLE my_table SET('prop3'='foo')");
+    table.refresh();
+    assertThat(
+        table.properties(),
+        allOf(hasEntry("prop1", "123"), hasEntry("prop2", "abc"), 
hasEntry("prop3", "foo")));
+
+    cli.execute("ALTER TABLE my_table RESET ('prop1') SET ('prop2'='xyz')");
+    table.refresh();
+    assertThat(table.properties(), allOf(hasEntry("prop2", "xyz"), 
hasEntry("prop3", "foo")));
+  }
+
+  @Test
+  public void testAlterTableSchema() {
+    cli.execute(createCatalog("my_catalog"));
+    cli.execute("CREATE DATABASE my_catalog.my_db");
+    cli.execute("USE DATABASE my_catalog.my_db");
+    cli.execute("CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER) 
TYPE 'iceberg'");
+    IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog();
+    Table table =
+        
catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table"));
+    Schema actualSchema = table.schema();
+    Schema expectedSchema =
+        new Schema(
+            optional(1, "col1", Types.StringType.get()),
+            optional(2, "col2", Types.IntegerType.get()));
+    assertTrue(
+        String.format("Unequal schemas.\nExpected: %s\nActual: %s", 
expectedSchema, actualSchema),
+        expectedSchema.sameSchema(actualSchema));
+
+    // add some columns
+    cli.execute(
+        "ALTER TABLE my_table ADD COLUMNS (col3 BOOLEAN COMMENT 
'col3-comment', col4 FLOAT COMMENT 'col4-comment')");
+    table.refresh();
+    actualSchema = table.schema();
+    expectedSchema =
+        new Schema(
+            optional(1, "col1", Types.StringType.get()),
+            optional(2, "col2", Types.IntegerType.get()),
+            optional(3, "col3", Types.BooleanType.get(), "col3-comment"),
+            optional(4, "col4", Types.FloatType.get(), "col4-comment"));
+    assertTrue(
+        String.format("Unequal schemas.\nExpected: %s\nActual: %s", 
expectedSchema, actualSchema),
+        expectedSchema.sameSchema(actualSchema));
+
+    // remove some columns and add other columns
+    cli.execute(
+        "ALTER TABLE my_table DROP COLUMNS (col1, col2, col3) ADD COLUMNS 
(colA VARCHAR, colB INTEGER)");
+    table.refresh();
+    actualSchema = table.schema();
+    expectedSchema =
+        new Schema(
+            optional(4, "col4", Types.FloatType.get(), "col4-comment"),
+            optional(5, "colA", Types.StringType.get()),
+            optional(6, "colB", Types.IntegerType.get()));
+    assertTrue(
+        String.format("Unequal schemas.\nExpected: %s\nActual: %s", 
expectedSchema, actualSchema),
+        expectedSchema.sameSchema(actualSchema));
+  }
+
+  @Test
+  public void testAlterTableSchemaFailsHelpfullyWhenAddingRequiredColumns() {
+    // adding required columns is not yet supported because Beam Schemas do not
+    // allow specifying a 'default value' for fields. This concept is required 
when adding a new
+    // Iceberg columns because it allows previously written rows to default to 
this value.
+    cli.execute(createCatalog("my_catalog"));
+    cli.execute("CREATE DATABASE my_catalog.my_db");
+    cli.execute("USE DATABASE my_catalog.my_db");
+    cli.execute("CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER) 
TYPE 'iceberg'");
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(
+        "Adding required columns is not yet supported. Encountered required 
columns: [col3]");
+    cli.execute("ALTER TABLE my_table ADD COLUMNS (col3 BOOLEAN NOT NULL)");
+  }
+
+  @Test
+  public void testAlterTablePartitionSpec() {
+    cli.execute(createCatalog("my_catalog"));
+    cli.execute("CREATE DATABASE my_catalog.my_db");
+    cli.execute("USE DATABASE my_catalog.my_db");
+    cli.execute(
+        "CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER, col3 
FLOAT, col4 BOOLEAN, col5 TIMESTAMP) "
+            + "TYPE 'iceberg' PARTITIONED BY ('col3', 'bucket(col2, 3)')");
+    IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog();
+    Table table =
+        
catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table"));
+    PartitionSpec actualSpec = table.spec();
+    PartitionSpec expectedSpec =
+        
PartitionSpec.builderFor(table.schema()).identity("col3").bucket("col2", 
3).build();
+    assertTrue(
+        String.format(
+            "Partition specs are not compatible.\nExpected: %s\nActual: %s",
+            expectedSpec, actualSpec),
+        expectedSpec.compatibleWith(actualSpec));
+
+    // add some partitions
+    cli.execute("ALTER TABLE my_table ADD PARTITIONS ('col4', 'month(col5)')");
+    table.refresh();
+    actualSpec = table.spec();
+    expectedSpec =
+        PartitionSpec.builderFor(table.schema())
+            .identity("col3")
+            .bucket("col2", 3)
+            .identity("col4")
+            .month("col5")
+            .withSpecId(table.spec().specId())
+            .build();
+    assertTrue(
+        String.format(
+            "Partition specs are not compatible.\nExpected: %s\nActual: %s",
+            expectedSpec, actualSpec),
+        expectedSpec.compatibleWith(actualSpec));
+
+    // remove some partitions and add other partitions
+    cli.execute(
+        "ALTER TABLE my_table DROP PARTITIONS ('month(col5)', 'bucket(col2, 
3)') ADD PARTITIONS ('hour(col5)')");
+    table.refresh();
+    actualSpec = table.spec();
+    expectedSpec =
+        PartitionSpec.builderFor(table.schema())
+            .identity("col3")
+            .identity("col4")
+            .hour("col5")
+            .withSpecId(table.spec().specId())
+            .build();
+    assertTrue(
+        String.format(
+            "Partition specs are not compatible.\nExpected: %s\nActual: %s",
+            expectedSpec, actualSpec),
+        expectedSpec.compatibleWith(actualSpec));
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp 
b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
index 73af7e18150..c3692430610 100644
--- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
+++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
@@ -36,6 +36,8 @@ data: {
         "org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseCatalog"
         "org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseDatabase"
         "org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetOptionBeam"
+        "org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterCatalog"
+        "org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterTable"
         "org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils"
         "org.apache.beam.sdk.schemas.Schema"
       ]
@@ -53,7 +55,10 @@ data: {
         "CATALOGS"
         "DATABASES"
         "TABLES"
+        "COLUMNS"
+        "PARTITIONS"
         "USE"
+        "UNSET"
       ]
 
       # List of keywords from "keywords" section that are not reserved.
@@ -432,6 +437,8 @@ data: {
         "SqlUseCatalog(Span.of(), null)"
         "SqlUseDatabase(Span.of(), null)"
         "SqlSetOptionBeam(Span.of(), null)"
+        "SqlAlterCatalog(Span.of(), null)"
+        "SqlAlterTable(Span.of(), null)"
       ]
 
       # List of methods for parsing custom literals.
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl 
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index d3bb8c2af56..94c0161c492 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -178,6 +178,21 @@ SqlNode Property() :
     }
 }
 
+SqlNodeList ArgList() :
+{
+    SqlNodeList list = new SqlNodeList(getPos());
+    SqlNode property;
+}
+{
+    property = StringLiteral() { list.add(property); }
+    (
+        <COMMA> property = StringLiteral() { list.add(property); }
+    )*
+    {
+        return list;
+    }
+}
+
 /**
  * CREATE CATALOG ( IF NOT EXISTS )? catalog_name
  *   TYPE type_name
@@ -247,6 +262,41 @@ SqlCall SqlUseCatalog(Span s, String scope) :
 }
 
 
+/**
+ * ALTER CATALOG catalog_name
+ *   [ SET (key1=val1, key2=val2, ...) ]
+ *   [ (RESET | UNSET) (key1, key2, ...) ]
+ */
+SqlCall SqlAlterCatalog(Span s, String scope) :
+{
+    final SqlNode catalogName;
+    SqlNodeList setProps = null;
+    SqlNodeList resetProps = null;
+}
+{
+    <ALTER> {
+        s.add(this);
+    }
+    <CATALOG>
+    (
+        catalogName = CompoundIdentifier()
+        |
+        catalogName = StringLiteral()
+    )
+    [ <SET> <LPAREN> setProps = PropertyList() <RPAREN> ]
+    [ (<RESET> | <UNSET>) <LPAREN> resetProps = ArgList() <RPAREN> ]
+
+    {
+        return new SqlAlterCatalog(
+            s.end(this),
+            scope,
+            catalogName,
+            setProps,
+            resetProps);
+    }
+}
+
+
 SqlDrop SqlDropCatalog(Span s, boolean replace) :
 {
     final boolean ifExists;
@@ -464,6 +514,18 @@ SqlCall SqlShowCurrent(Span s) :
     }
 }
 
+SqlNodeList PartitionFieldsParens() :
+{
+    final SqlNodeList partitions;
+}
+{
+    <LPAREN>
+    partitions = PartitionFieldList()
+    <RPAREN>
+    {
+        return partitions;
+    }
+}
 
 SqlNodeList PartitionFieldList() :
 {
@@ -517,7 +579,7 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) :
     |
         type = SimpleIdentifier()
     )
-    [ <PARTITIONED> <BY> <LPAREN> partitionFields = PartitionFieldList() 
<RPAREN> ]
+    [ <PARTITIONED> <BY> partitionFields = PartitionFieldsParens() ]
     [ <COMMENT> comment = StringLiteral() ]
     [ <LOCATION> location = StringLiteral() ]
     [ <TBLPROPERTIES> tblProperties = StringLiteral() ]
@@ -537,6 +599,62 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) :
     }
 }
 
+/**
+ * Loosely following Flink's grammar: 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/alter/#alter-table
+ * ALTER TABLE table_name
+ *   [ ADD COLUMNS <column_def, column_def, ...> ]
+ *   [ DROP COLUMNS <column_name, column_name> ]
+ *   [ ADD PARTITIONS <partition_field, partition_field, ...> ]
+ *   [ DROP PARTITIONS <partition_field, partition_field, ...> ]
+ *   [ SET (key1=val1, key2=val2, ...) ]
+ *   [ (RESET | UNSET) (key1, key2, ...) ]
+ */
+SqlCall SqlAlterTable(Span s, String scope) :
+{
+    final SqlNode tableName;
+    SqlNodeList columnsToDrop = null;
+    List<Schema.Field> columnsToAdd = null;
+    SqlNodeList partitionsToDrop = null;
+    SqlNodeList partitionsToAdd = null;
+    SqlNodeList setProps = null;
+    SqlNodeList resetProps = null;
+}
+{
+    <ALTER> {
+        s.add(this);
+    }
+    <TABLE>
+    tableName = CompoundIdentifier()
+
+    [ <DROP> (
+      <COLUMNS> columnsToDrop = ParenthesizedSimpleIdentifierList()
+        |
+      <PARTITIONS> partitionsToDrop = ParenthesizedLiteralOptionCommaList()
+    ) ]
+
+    [ <ADD> (
+      <COLUMNS> columnsToAdd = FieldListParens()
+        |
+      <PARTITIONS> partitionsToAdd = ParenthesizedLiteralOptionCommaList()
+    ) ]
+
+    [ (<RESET> | <UNSET>) <LPAREN> resetProps = ArgList() <RPAREN> ]
+    [ <SET> <LPAREN> setProps = PropertyList() <RPAREN> ]
+
+    {
+        return new SqlAlterTable(
+            s.end(this),
+            scope,
+            tableName,
+            columnsToAdd,
+            columnsToDrop,
+            partitionsToAdd,
+            partitionsToDrop,
+            setProps,
+            resetProps);
+    }
+}
+
 SqlCreate SqlCreateFunction(Span s, boolean replace) :
 {
     boolean isAggregate = false;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java
index 792e5b98bcd..e532355d856 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java
@@ -68,6 +68,10 @@ public class CatalogSchema implements Schema {
     return catalog;
   }
 
+  public void updateProperties(Map<String, String> setProps, 
Collection<String> resetProps) {
+    catalog.updateProperties(setProps, resetProps);
+  }
+
   public @Nullable BeamCalciteSchema getCurrentDatabaseSchema() {
     return getSubSchema(catalog.currentDatabase());
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java
new file mode 100644
index 00000000000..a67b5f89242
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterCatalog extends SqlAlter implements 
BeamSqlParser.ExecutableStatement {
+  private static final SqlOperator OPERATOR =
+      new SqlSpecialOperator("ALTER CATALOG", SqlKind.OTHER_DDL);
+  private final SqlIdentifier name;
+  private final @Nullable SqlNodeList setProps;
+  private final @Nullable SqlNodeList resetProps;
+
+  /**
+   * Called by the auto-generated {@link
+   * org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl}. 
Check SqlAlterCatalog
+   * in `sql/src/main/codegen/includesparserImpls.ftl` to see the 
corresponding SQL syntax
+   */
+  public SqlAlterCatalog(
+      SqlParserPos pos,
+      @Nullable String scope,
+      SqlNode name,
+      @Nullable SqlNodeList setProps,
+      @Nullable SqlNodeList resetProps) {
+    super(pos, scope);
+    this.name = SqlDdlNodes.getIdentifier(name, pos);
+    this.setProps = setProps;
+    this.resetProps = resetProps;
+  }
+
+  @Override
+  public void execute(CalcitePrepare.Context context) {
+    final Pair<CalciteSchema, String> pair = SqlDdlNodes.schema(context, true, 
name);
+    Schema schema = pair.left.schema;
+
+    if (!(schema instanceof CatalogManagerSchema)) {
+      throw SqlUtil.newContextException(
+          name.getParserPosition(),
+          RESOURCE.internal(
+              "Attempting to alter catalog '"
+                  + SqlDdlNodes.name(name)
+                  + "' with unexpected Calcite Schema of type "
+                  + schema.getClass()));
+    }
+
+    CatalogSchema catalogSchema =
+        ((CatalogManagerSchema) 
schema).getCatalogSchema(SqlDdlNodes.getString(name));
+
+    Map<String, String> setPropsMap = SqlDdlNodes.getStringMap(setProps);
+    Collection<String> resetPropsList = SqlDdlNodes.getStringList(resetProps);
+
+    ImmutableList.Builder<String> overlappingPropsBuilder = 
ImmutableList.builder();
+    
resetPropsList.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add);
+    List<String> overlappingProps = overlappingPropsBuilder.build();
+    checkState(
+        overlappingProps.isEmpty(),
+        "Invalid %s call: Found overlapping properties between SET and RESET: 
%s.",
+        OPERATOR,
+        overlappingProps);
+
+    catalogSchema.updateProperties(setPropsMap, resetPropsList);
+  }
+
+  @Override
+  public void unparseAlterOperation(SqlWriter writer, int left, int right) {
+    unparse(writer, left, right);
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int left, int right) {
+    writer.keyword("ALTER");
+    writer.keyword("CATALOG");
+    name.unparse(writer, left, right);
+    if (setProps != null && !setProps.isEmpty()) {
+      writer.keyword("SET");
+      writer.keyword("(");
+      for (int i = 0; i < setProps.size(); i++) {
+        if (i > 0) {
+          writer.keyword(",");
+        }
+        SqlNode property = setProps.get(i);
+        checkState(
+            property instanceof SqlNodeList,
+            String.format(
+                "Unexpected properties entry '%s' of class '%s'", property, 
property.getClass()));
+        SqlNodeList kv = ((SqlNodeList) property);
+
+        kv.get(0).unparse(writer, left, right); // key
+        writer.keyword("=");
+        kv.get(1).unparse(writer, left, right); // value
+      }
+      writer.keyword(")");
+    }
+
+    if (resetProps != null) {
+      writer.keyword("RESET");
+      writer.keyword("(");
+      for (int i = 0; i < resetProps.size(); i++) {
+        if (i > 0) {
+          writer.keyword(",");
+        }
+        SqlNode field = resetProps.get(i);
+        field.unparse(writer, 0, 0);
+      }
+      writer.keyword(")");
+    }
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    ImmutableList.Builder<SqlNode> operands = ImmutableList.builder();
+    operands.add(name);
+    if (setProps != null) {
+      operands.add(setProps);
+    }
+    if (resetProps != null) {
+      operands.add(resetProps);
+    }
+    return operands.build();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java
new file mode 100644
index 00000000000..4c4fa4f8f1d
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import org.apache.beam.sdk.extensions.sql.impl.TableName;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterTable extends SqlAlter implements 
BeamSqlParser.ExecutableStatement {
+  private static final SqlOperator OPERATOR =
+      new SqlSpecialOperator("ALTER TABLE", SqlKind.ALTER_TABLE);
+  private final SqlIdentifier name;
+  private final @Nullable List<Field> columnsToAdd;
+  private final @Nullable SqlNodeList columnsToDrop;
+  private final @Nullable SqlNodeList partitionsToAdd;
+  private final @Nullable SqlNodeList partitionsToDrop;
+  private final @Nullable SqlNodeList setProps;
+  private final @Nullable SqlNodeList resetProps;
+
+  /**
+   * Called by the auto-generated {@link
+   * org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl}. 
Check SqlAlterTable in
+   * `sql/src/main/codegen/includesparserImpls.ftl` to see the corresponding 
SQL syntax
+   */
+  public SqlAlterTable(
+      SqlParserPos pos,
+      @Nullable String scope,
+      SqlNode name,
+      @Nullable List<Field> columnsToAdd,
+      @Nullable SqlNodeList columnsToDrop,
+      @Nullable SqlNodeList partitionsToAdd,
+      @Nullable SqlNodeList partitionsToDrop,
+      @Nullable SqlNodeList setProps,
+      @Nullable SqlNodeList resetProps) {
+    super(pos, scope);
+    this.name = SqlDdlNodes.getIdentifier(name, pos);
+    this.columnsToAdd = columnsToAdd;
+    this.columnsToDrop = columnsToDrop;
+    this.partitionsToAdd = partitionsToAdd;
+    this.partitionsToDrop = partitionsToDrop;
+    this.setProps = setProps;
+    this.resetProps = resetProps;
+  }
+
+  @Override
+  public void execute(CalcitePrepare.Context context) {
+    final Pair<CalciteSchema, String> pair = SqlDdlNodes.schema(context, true, 
name);
+    TableName pathOverride = TableName.create(name.toString());
+    Schema schema = pair.left.schema;
+
+    BeamCalciteSchema beamCalciteSchema;
+    if (schema instanceof CatalogManagerSchema) {
+      CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) 
schema;
+      CatalogSchema catalogSchema =
+          pathOverride.catalog() != null
+              ? catalogManagerSchema.getCatalogSchema(pathOverride)
+              : catalogManagerSchema.getCurrentCatalogSchema();
+      beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
+    } else if (schema instanceof BeamCalciteSchema) {
+      beamCalciteSchema = (BeamCalciteSchema) schema;
+    } else {
+      throw SqlUtil.newContextException(
+          name.getParserPosition(),
+          RESOURCE.internal(
+              "Attempting to drop a table using unexpected Calcite Schema of 
type "
+                  + schema.getClass()));
+    }
+
+    if (beamCalciteSchema.getTable(pair.right) == null) {
+      // Table does not exist.
+      throw SqlUtil.newContextException(
+          name.getParserPosition(), RESOURCE.tableNotFound(name.toString()));
+    }
+
+    Map<String, String> setPropsMap = SqlDdlNodes.getStringMap(setProps);
+    List<String> resetPropsList = SqlDdlNodes.getStringList(resetProps);
+    List<String> columnsToDropList = SqlDdlNodes.getStringList(columnsToDrop);
+    List<String> partitionsToAddList = 
SqlDdlNodes.getStringList(partitionsToAdd);
+    List<String> partitionsToDropList = 
SqlDdlNodes.getStringList(partitionsToDrop);
+
+    AlterTableOps alterOps =
+        
beamCalciteSchema.getTableProvider().alterTable(SqlDdlNodes.name(name));
+
+    if (!setPropsMap.isEmpty() || !resetPropsList.isEmpty()) {
+      validateNonOverlappingProps(setPropsMap, resetPropsList);
+
+      alterOps.updateTableProperties(setPropsMap, resetPropsList);
+    }
+    if (!columnsToDropList.isEmpty() || (columnsToAdd != null && 
!columnsToAdd.isEmpty())) {
+      alterOps.updateSchema(firstNonNull(columnsToAdd, 
Collections.emptyList()), columnsToDropList);
+    }
+    if (!partitionsToDropList.isEmpty() || !partitionsToAddList.isEmpty()) {
+      alterOps.updatePartitionSpec(partitionsToAddList, partitionsToDropList);
+    }
+  }
+
+  private void validateNonOverlappingProps(
+      Map<String, String> setPropsMap, Collection<String> resetPropsList) {
+    ImmutableList.Builder<String> overlappingPropsBuilder = 
ImmutableList.builder();
+
+    
resetPropsList.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add);
+
+    List<String> overlappingProps = overlappingPropsBuilder.build();
+    checkState(
+        overlappingProps.isEmpty(),
+        "Invalid '%s' call: Found overlapping properties between SET and 
RESET: %s.",
+        OPERATOR,
+        overlappingProps);
+  }
+
+  @Override
+  public void unparseAlterOperation(SqlWriter writer, int left, int right) {
+    unparse(writer, left, right);
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int left, int right) {
+    writer.keyword("ALTER");
+    writer.keyword("TABLE");
+    name.unparse(writer, left, right);
+
+    if (columnsToDrop != null && !columnsToDrop.isEmpty()) {
+      writer.keyword("DROP COLUMNS");
+      SqlWriter.Frame frame = writer.startList("(", ")");
+      for (String colName : SqlDdlNodes.getStringList(columnsToDrop)) {
+        writer.sep(",");
+        writer.identifier(colName, false);
+      }
+      writer.endList(frame);
+    }
+
+    if (columnsToAdd != null && !columnsToAdd.isEmpty()) {
+      writer.keyword("ADD COLUMNS");
+      SqlWriter.Frame frame = writer.startList("(", ")");
+      columnsToAdd.forEach(column -> unparseColumn(writer, column));
+      writer.endList(frame);
+    }
+
+    if (partitionsToDrop != null && !partitionsToDrop.isEmpty()) {
+      writer.keyword("DROP PARTITIONS");
+      SqlWriter.Frame frame = writer.startList("(", ")");
+      for (String partition : SqlDdlNodes.getStringList(partitionsToDrop)) {
+        writer.sep(",");
+        writer.identifier(partition, true);
+      }
+      writer.endList(frame);
+    }
+
+    if (partitionsToAdd != null && !partitionsToAdd.isEmpty()) {
+      writer.keyword("ADD PARTITIONS");
+      SqlWriter.Frame frame = writer.startList("(", ")");
+      for (String partition : SqlDdlNodes.getStringList(partitionsToAdd)) {
+        writer.sep(",");
+        writer.identifier(partition, true);
+      }
+      writer.endList(frame);
+    }
+
+    if (resetProps != null && !resetProps.isEmpty()) {
+      writer.keyword("RESET");
+      SqlWriter.Frame frame = writer.startList("(", ")");
+      for (SqlNode resetProp : resetProps) {
+        writer.sep(",");
+        resetProp.unparse(writer, 0, 0);
+      }
+      writer.endList(frame);
+    }
+
+    if (setProps != null && !setProps.isEmpty()) {
+      writer.keyword("SET");
+      SqlWriter.Frame frame = writer.startList("(", ")");
+      for (SqlNode setProp : setProps) {
+        writer.sep(",");
+        SqlNodeList kv = (SqlNodeList) setProp;
+        kv.get(0).unparse(writer, left, right); // key
+        writer.keyword("=");
+        kv.get(1).unparse(writer, left, right); // value
+      }
+      writer.endList(frame);
+    }
+  }
+
+  private void unparseColumn(SqlWriter writer, Field column) {
+    writer.sep(",");
+    writer.identifier(column.getName(), false);
+    writer.keyword(CalciteUtils.toSqlTypeName(column.getType()).name());
+
+    if (column.getType().getNullable() != null && 
!column.getType().getNullable()) {
+      writer.keyword("NOT NULL");
+    }
+
+    if (!Strings.isNullOrEmpty(column.getDescription())) {
+      writer.keyword("COMMENT");
+      writer.identifier(column.getDescription(), true);
+    }
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    ImmutableList.Builder<SqlNode> operands = ImmutableList.builder();
+    operands.add(name);
+    if (setProps != null) {
+      operands.add(setProps);
+    }
+    if (resetProps != null) {
+      operands.add(resetProps);
+    }
+    return operands.build();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
index de7903897b6..ab644145b4f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
@@ -159,7 +159,7 @@ public class SqlCreateExternalTable extends SqlCreate 
implements BeamSqlParser.E
       CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) 
schema;
       catalogManagerSchema.maybeRegisterProvider(pathOverride, 
SqlDdlNodes.getString(type));
 
-      CatalogSchema catalogSchema = ((CatalogManagerSchema) 
schema).getCatalogSchema(pathOverride);
+      CatalogSchema catalogSchema = 
catalogManagerSchema.getCatalogSchema(pathOverride);
       beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
     } else if (schema instanceof BeamCalciteSchema) {
       beamCalciteSchema = (BeamCalciteSchema) schema;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
index 6f4d8ee79d9..6d6be5d5a12 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
@@ -19,18 +19,24 @@ package org.apache.beam.sdk.extensions.sql.impl.parser;
 
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlDataTypeSpec;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlLiteral;
 import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.NlsString;
 import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
 import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Util;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Utilities concerning {@link SqlNode} for DDL. */
@@ -97,6 +103,41 @@ public class SqlDdlNodes {
     return literalValue == null ? null : literalValue.getValue();
   }
 
+  static List<String> getStringList(@Nullable SqlNodeList l) {
+    if (l == null || l.isEmpty()) {
+      return Collections.emptyList();
+    }
+    ImmutableList.Builder<String> resetPropsList = ImmutableList.builder();
+    for (SqlNode propNode : l) {
+      @Nullable String prop = SqlDdlNodes.getString(propNode);
+      if (prop != null) {
+        resetPropsList.add(prop);
+      }
+    }
+    return resetPropsList.build();
+  }
+
+  static Map<String, String> getStringMap(@Nullable SqlNodeList nodeList) {
+    if (nodeList == null || nodeList.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    for (SqlNode property : nodeList) {
+      checkState(
+          property instanceof SqlNodeList,
+          String.format(
+              "Unexpected properties entry '%s' of class '%s'", property, 
property.getClass()));
+      SqlNodeList kv = ((SqlNodeList) property);
+      checkState(kv.size() == 2, "Expected 2 items in properties entry, but 
got %s", kv.size());
+      String key = checkStateNotNull(SqlDdlNodes.getString(kv.get(0)));
+      String value = checkStateNotNull(SqlDdlNodes.getString(kv.get(1)));
+      builder.put(key, value);
+    }
+
+    return builder.build();
+  }
+
   static SqlIdentifier getIdentifier(SqlNode n, SqlParserPos pos) {
     if (n instanceof SqlIdentifier) {
       return (SqlIdentifier) n;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java
index 4b838c9f418..4d4d0343c73 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java
@@ -84,7 +84,11 @@ public class SqlDropDatabase extends SqlDrop implements 
BeamSqlParser.Executable
 
     List<String> components = 
Lists.newArrayList(Splitter.on(".").split(databaseName.toString()));
     TableName pathOverride = TableName.create(components, "");
-    CatalogSchema catalogSchema = ((CatalogManagerSchema) 
schema).getCatalogSchema(pathOverride);
+    CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema;
+    CatalogSchema catalogSchema =
+        pathOverride.catalog() != null
+            ? catalogManagerSchema.getCatalogSchema(pathOverride)
+            : catalogManagerSchema.getCurrentCatalogSchema();
     catalogSchema.dropDatabase(databaseName, cascade, ifExists);
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
index 0bc5cd91161..5d0cf223406 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
@@ -53,7 +53,11 @@ public class SqlDropTable extends SqlDropObject {
 
     BeamCalciteSchema beamCalciteSchema;
     if (schema instanceof CatalogManagerSchema) {
-      CatalogSchema catalogSchema = ((CatalogManagerSchema) 
schema).getCatalogSchema(pathOverride);
+      CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) 
schema;
+      CatalogSchema catalogSchema =
+          pathOverride.catalog() != null
+              ? catalogManagerSchema.getCatalogSchema(pathOverride)
+              : catalogManagerSchema.getCurrentCatalogSchema();
       beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
     } else if (schema instanceof BeamCalciteSchema) {
       beamCalciteSchema = (BeamCalciteSchema) schema;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
index f0e3fa59ddc..9d06e471dbb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
@@ -78,7 +78,7 @@ public class SqlUseDatabase extends SqlSetOption implements 
BeamSqlParser.Execut
     }
 
     CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema;
-    CatalogSchema catalogSchema = ((CatalogManagerSchema) 
schema).getCatalogSchema(pathOverride);
+    CatalogSchema catalogSchema = 
catalogManagerSchema.getCatalogSchema(pathOverride);
     // if database exists in a different catalog, we need to also switch to 
that catalog
     if (pathOverride.catalog() != null
         && !pathOverride
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
index c387a5ace10..cbf1b45c31e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
@@ -88,6 +88,9 @@ public interface Catalog {
   /** User-specified configuration properties. */
   Map<String, String> properties();
 
+  /** Set some catalog properties. If a property already exists, it will be 
overridden. */
+  void updateProperties(Map<String, String> setProps, Collection<String> 
resetProps);
+
   /** Registers this {@link TableProvider} and propagates it to underlying 
{@link MetaStore}s. */
   void registerTableProvider(TableProvider provider);
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
index 7c0d8b9d32e..cdee6c93022 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
@@ -34,7 +34,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 
 public class InMemoryCatalog implements Catalog {
   private final String name;
-  private final Map<String, String> properties;
+  protected final Map<String, String> properties;
   protected final Map<String, TableProvider> tableProviders = new HashMap<>();
   private final Map<String, MetaStore> metaStores = new HashMap<>();
   private final HashSet<String> databases = new 
HashSet<>(Collections.singleton(DEFAULT));
@@ -77,6 +77,12 @@ public class InMemoryCatalog implements Catalog {
     return Preconditions.checkStateNotNull(properties, "InMemoryCatalog has 
not been initialized");
   }
 
+  @Override
+  public void updateProperties(Map<String, String> setProps, 
Collection<String> resetProps) {
+    properties.putAll(setProps);
+    resetProps.forEach(properties::remove);
+  }
+
   @Override
   public boolean createDatabase(String database) {
     return databases.add(database);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/AlterTableOps.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/AlterTableOps.java
new file mode 100644
index 00000000000..3f1ea172168
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/AlterTableOps.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * An interface that handles ALTER TABLE operations.
+ *
+ * <p>An instance is created and used when {@link 
TableProvider#alterTable(String)} is called.
+ */
+public interface AlterTableOps {
+  /**
+   * Updates a table's properties. Includes setting properties (which 
overwrites existing values),
+   * and/or resetting properties (removes values of given keys).
+   */
+  void updateTableProperties(Map<String, String> setProps, List<String> 
resetProps);
+
+  /** Updates a table's schema. Includes adding new columns and/or dropping 
existing columns. */
+  void updateSchema(List<Schema.Field> columnsToAdd, Collection<String> 
columnsToDrop);
+
+  /**
+   * Updates a table's partition spec, if applicable. Includes adding new 
partitions and/or dropping
+   * existing partitions.
+   */
+  void updatePartitionSpec(List<String> partitionsToAdd, Collection<String> 
partitionsToDrop);
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
index 9be8c96b7c9..595d9cba52f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
@@ -80,4 +80,9 @@ public interface TableProvider {
   default boolean supportsPartitioning(Table table) {
     return false;
   }
+
+  default AlterTableOps alterTable(String name) {
+    throw new UnsupportedOperationException(
+        String.format("ALTER is not supported for table '%s' of type '%s'.", 
name, getTableType()));
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java
new file mode 100644
index 00000000000..0ab17d3df89
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.test;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.TableWithRows;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+
+public class AlterTestTableOps implements AlterTableOps {
+  private final TableWithRows tableWithRows;
+
+  AlterTestTableOps(TableWithRows tableWithRows) {
+    this.tableWithRows = tableWithRows;
+  }
+
+  @Override
+  public void updateTableProperties(Map<String, String> setProps, List<String> 
resetProps) {
+    ObjectNode props = tableWithRows.getTable().getProperties();
+    resetProps.forEach(props::remove);
+    setProps.forEach(props::put);
+    
tableWithRows.setTable(tableWithRows.getTable().toBuilder().properties(props).build());
+  }
+
+  @Override
+  public void updateSchema(List<Field> columnsToAdd, Collection<String> 
columnsToDrop) {
+    if (!columnsToAdd.isEmpty() && !tableWithRows.getRows().isEmpty()) {
+      ImmutableList.Builder<String> requiredFields = ImmutableList.builder();
+      for (Field f : columnsToAdd) {
+        if (!f.getType().getNullable()) {
+          requiredFields.add(f.getName());
+        }
+      }
+      Preconditions.checkArgument(
+          requiredFields.build().isEmpty(),
+          "Cannot add required fields %s because table '%s' already contains 
rows.",
+          requiredFields.build(),
+          tableWithRows.getTable().getName());
+    }
+
+    // update the schema
+    List<Field> schemaFields = 
tableWithRows.getTable().getSchema().getFields();
+    ImmutableList.Builder<Field> newSchemaFields = ImmutableList.builder();
+    // remove dropped fields
+    schemaFields.stream()
+        .filter(f -> !columnsToDrop.contains(f.getName()))
+        .forEach(newSchemaFields::add);
+    // add new fields
+    newSchemaFields.addAll(columnsToAdd);
+    Schema newSchema = Schema.of(newSchemaFields.build().toArray(new 
Field[0]));
+    
tableWithRows.setTable(tableWithRows.getTable().toBuilder().schema(newSchema).build());
+
+    // update existing rows
+    List<Row> rows = tableWithRows.getRows();
+    ImmutableList.Builder<Row> newRows = 
ImmutableList.builderWithExpectedSize(rows.size());
+    for (Row row : rows) {
+      Map<String, Object> values = new HashMap<>();
+      // add existing values, minus dropped columns
+      for (Field field : schemaFields) {
+        String name = field.getName();
+        if (!columnsToDrop.contains(name)) {
+          values.put(name, row.getValue(name));
+        }
+      }
+      Row newRow = Row.withSchema(newSchema).withFieldValues(values).build();
+      newRows.add(newRow);
+    }
+    tableWithRows.setRows(new CopyOnWriteArrayList<Row>(newRows.build()));
+  }
+
+  @Override
+  public void updatePartitionSpec(
+      List<String> partitionsToAdd, Collection<String> partitionsToDrop) {
+    throw new UnsupportedOperationException(
+        TestTableProvider.class.getSimpleName() + " does not support 
partitions.");
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index 375cb42c490..365ed9fbc2c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -36,6 +36,7 @@ import 
org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
 import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
 import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -50,6 +51,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -107,6 +109,13 @@ public class TestTableProvider extends 
InMemoryMetaTableProvider {
         .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().table));
   }
 
+  @Override
+  public AlterTableOps alterTable(String name) {
+    TableWithRows table =
+        Preconditions.checkArgumentNotNull(tables().get(name), "Could not find 
table '%s'", name);
+    return new AlterTestTableOps(table);
+  }
+
   @Override
   public synchronized BeamSqlTable buildBeamSqlTable(Table table) {
     return new InMemoryTable(tables().get(table.getName()));
@@ -133,9 +142,21 @@ public class TestTableProvider extends 
InMemoryMetaTableProvider {
       this.rows = new CopyOnWriteArrayList<>();
     }
 
+    public Table getTable() {
+      return table;
+    }
+
+    void setTable(Table table) {
+      this.table = table;
+    }
+
     public List<Row> getRows() {
       return rows;
     }
+
+    void setRows(List<Row> rows) {
+      this.rows = rows;
+    }
   }
 
   private static class InMemoryTable extends BaseBeamTable {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
index 83b8685c3fe..8892cd889fd 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -150,4 +151,14 @@ public class InMemoryMetaStore implements MetaStore {
 
     throw new IllegalStateException("No TableProvider registered for table 
type: " + type);
   }
+
+  @Override
+  public AlterTableOps alterTable(String name) {
+    if (!tables.containsKey(name)) {
+      throw new IllegalArgumentException("No such table: " + name);
+    }
+
+    Table table = tables.get(name);
+    return getProvider(table.getType()).alterTable(name);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java
index 0164c634814..1fc9ebcbe18 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterCatalog;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
 import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
@@ -35,6 +36,13 @@ import 
org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.runtime.CalciteContextException;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlLiteral;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.dialect.AnsiSqlDialect;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.pretty.SqlPrettyWriter;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.junit.Before;
 import org.junit.Rule;
@@ -330,4 +338,89 @@ public class BeamSqlCliCatalogTest {
     cli.execute("DROP TABLE catalog_1.db_1.person");
     assertNull(metastoreDb1.getTable("person"));
   }
+
+  @Test
+  public void testAlterCatalog() {
+    cli.execute("CREATE CATALOG my_catalog TYPE 'local' 
PROPERTIES('foo'='abc', 'bar'='xyz')");
+    cli.execute("USE CATALOG my_catalog");
+    assertEquals(
+        ImmutableMap.of("foo", "abc", "bar", "xyz"), 
catalogManager.currentCatalog().properties());
+    cli.execute("ALTER CATALOG my_catalog SET ('foo'='123', 'new'='val')");
+    assertEquals(
+        ImmutableMap.of("foo", "123", "bar", "xyz", "new", "val"),
+        catalogManager.currentCatalog().properties());
+    cli.execute("ALTER CATALOG my_catalog RESET ('foo', 'bar')");
+    assertEquals(ImmutableMap.of("new", "val"), 
catalogManager.currentCatalog().properties());
+  }
+
+  @Test
+  public void testUnparse_SetProperties() {
+    SqlNode catalogName = new SqlIdentifier("test_catalog", POS);
+
+    SqlNodeList setProps = new SqlNodeList(POS);
+    setProps.add(createPropertyPair("k1", "v1"));
+    setProps.add(createPropertyPair("k2", "v2"));
+
+    SqlAlterCatalog alterCatalog = new SqlAlterCatalog(POS, null, catalogName, 
setProps, null);
+
+    String expectedSql = "ALTER CATALOG `test_catalog` SET ('k1' = 'v1', 'k2' 
= 'v2')";
+    assertEquals(expectedSql, toSql(alterCatalog));
+  }
+
+  @Test
+  public void testUnparse_ResetProperties() {
+    SqlNode catalogName = new SqlIdentifier("test_catalog", POS);
+
+    SqlNodeList resetProps = new SqlNodeList(POS);
+    resetProps.add(SqlLiteral.createCharString("k1", POS));
+    resetProps.add(SqlLiteral.createCharString("k2", POS));
+
+    SqlAlterCatalog alterCatalog = new SqlAlterCatalog(POS, null, catalogName, 
null, resetProps);
+
+    String expectedSql = "ALTER CATALOG `test_catalog` RESET ('k1', 'k2')";
+    assertEquals(expectedSql, toSql(alterCatalog));
+  }
+
+  @Test
+  public void testUnparse_SetAndResetProperties() {
+    SqlNode catalogName = new SqlIdentifier("my_cat", POS);
+
+    SqlNodeList setProps = new SqlNodeList(POS);
+    setProps.add(createPropertyPair("k1", "v1"));
+
+    SqlNodeList resetProps = new SqlNodeList(POS);
+    resetProps.add(SqlLiteral.createCharString("k2", POS));
+
+    SqlAlterCatalog alterCatalog =
+        new SqlAlterCatalog(POS, null, catalogName, setProps, resetProps);
+
+    String expectedSql = "ALTER CATALOG `my_cat` SET ('k1' = 'v1') RESET 
('k2')";
+    assertEquals(expectedSql, toSql(alterCatalog));
+  }
+
+  private static final SqlParserPos POS = SqlParserPos.ZERO;
+
+  /**
+   * Helper to execute the unparse mechanism using a PrettyWriter. This 
triggers SqlAlter.unparse ->
+   * SqlAlterCatalog.unparseAlterOperation.
+   */
+  private String toSql(SqlAlterCatalog node) {
+    SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
+    writer.setAlwaysUseParentheses(false);
+    writer.setSelectListItemsOnSeparateLines(false);
+    writer.setIndentation(0);
+    node.unparse(writer, 0, 0);
+    return writer.toSqlString().getSql();
+  }
+
+  /**
+   * Helper to create the structure expected by SqlAlterCatalog for K=V pairs. 
Expects a SqlNodeList
+   * containing two literals [Key, Value].
+   */
+  private SqlNodeList createPropertyPair(String key, String value) {
+    SqlNodeList pair = new SqlNodeList(POS);
+    pair.add(SqlLiteral.createCharString(key, POS));
+    pair.add(SqlLiteral.createCharString(value, POS));
+    return pair;
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index ffbdeb84f13..522d9c35bbf 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -24,21 +24,37 @@ import static 
org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimest
 import static org.apache.beam.sdk.schemas.Schema.toSchema;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.oneOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 import java.time.LocalDate;
 import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.sql.impl.ParseException;
+import org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlLiteral;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.dialect.AnsiSqlDialect;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.pretty.SqlPrettyWriter;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -300,4 +316,237 @@ public class BeamSqlCliTest {
     // test TIMESTAMP field
     assertEquals(parseTimestampWithUTCTimeZone("2018-07-01 21:26:07.123"), 
row.getDateTime("f_ts"));
   }
+
+  @Test
+  public void testAlterTableSchema() {
+    InMemoryCatalogManager catalogManager = new InMemoryCatalogManager();
+    TestTableProvider provider = new TestTableProvider();
+    catalogManager.registerTableProvider(provider);
+    BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager);
+
+    cli.execute(
+        "CREATE EXTERNAL TABLE test_table(id integer not null, str varchar not 
null, fl float) type 'test'");
+    cli.execute("INSERT INTO test_table VALUES (1, 'a', 0.1), (2, 'b', 0.2), 
(3, 'c', 0.3)");
+    TestTableProvider.TableWithRows tableWithRows = 
provider.tables().get("test_table");
+    assertNotNull(tableWithRows);
+    Schema initialSchema =
+        Schema.builder()
+            .addInt32Field("id")
+            .addStringField("str")
+            .addNullableFloatField("fl")
+            .build();
+    assertEquals(initialSchema, tableWithRows.getTable().getSchema());
+    List<Row> initialRows =
+        Arrays.asList(
+            Row.withSchema(initialSchema).addValues(1, "a", 0.1f).build(),
+            Row.withSchema(initialSchema).addValues(2, "b", 0.2f).build(),
+            Row.withSchema(initialSchema).addValues(3, "c", 0.3f).build());
+    assertThat(initialRows, 
everyItem(is(oneOf(tableWithRows.getRows().toArray(new Row[0])))));
+
+    cli.execute(
+        "ALTER TABLE test_table DROP COLUMNS (str, fl) ADD COLUMNS (newBool 
boolean, newLong bigint)");
+    cli.execute("INSERT INTO test_table VALUES (4, true, 4), (5, false, 5), 
(6, false, 6)");
+    Schema newSchema =
+        Schema.builder()
+            .addInt32Field("id")
+            .addNullableBooleanField("newBool")
+            .addNullableInt64Field("newLong")
+            .build();
+    assertEquals(newSchema, tableWithRows.getTable().getSchema());
+
+    // existing rows should have the corresponding values dropped
+    List<Row> newRows =
+        Arrays.asList(
+            Row.withSchema(newSchema).addValues(1, null, null).build(),
+            Row.withSchema(newSchema).addValues(2, null, null).build(),
+            Row.withSchema(newSchema).addValues(3, null, null).build(),
+            Row.withSchema(newSchema).addValues(4, true, 4L).build(),
+            Row.withSchema(newSchema).addValues(5, false, 5L).build(),
+            Row.withSchema(newSchema).addValues(6, false, 6L).build());
+    assertThat(newRows, everyItem(is(oneOf(tableWithRows.getRows().toArray(new 
Row[0])))));
+  }
+
+  @Test
+  public void testAlterTableProperties() {
+    InMemoryCatalogManager catalogManager = new InMemoryCatalogManager();
+    TestTableProvider provider = new TestTableProvider();
+    catalogManager.registerTableProvider(provider);
+    BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager);
+
+    cli.execute(
+        "CREATE EXTERNAL TABLE test_table(id integer, str varchar) type 'test' 
"
+            + "TBLPROPERTIES '{ \"foo\" : \"123\", \"bar\" : \"abc\"}'");
+    TestTableProvider.TableWithRows tableWithRows = 
provider.tables().get("test_table");
+    assertNotNull(tableWithRows);
+    assertEquals("123", 
tableWithRows.getTable().getProperties().get("foo").asText());
+    assertEquals("abc", 
tableWithRows.getTable().getProperties().get("bar").asText());
+
+    cli.execute("ALTER TABLE test_table RESET('bar') SET('foo'='456', 
'baz'='xyz')");
+    assertEquals("456", 
tableWithRows.getTable().getProperties().get("foo").asText());
+    assertEquals("xyz", 
tableWithRows.getTable().getProperties().get("baz").asText());
+    assertFalse(tableWithRows.getTable().getProperties().has("bar"));
+  }
+
+  private static final SqlParserPos POS = SqlParserPos.ZERO;
+
+  @Test
+  public void testUnparseAlter_AddColumns() {
+    SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+    Field col1 = Field.of("new_col_1", 
Schema.FieldType.STRING).withNullable(true);
+    Field col2 =
+        Field.of("new_col_2", Schema.FieldType.INT64)
+            .withNullable(false)
+            .withDescription("description for col2");
+
+    List<Field> columnsToAdd = Arrays.asList(col1, col2);
+
+    SqlAlterTable alterTable =
+        new SqlAlterTable(POS, null, tableName, columnsToAdd, null, null, 
null, null, null);
+
+    String expectedSql =
+        "ALTER TABLE `test_table` "
+            + "ADD COLUMNS (`new_col_1` VARCHAR, "
+            + "`new_col_2` BIGINT NOT NULL COMMENT `description for col2`)";
+
+    assertEquals(expectedSql, toSql(alterTable));
+  }
+
+  @Test
+  public void testUnparseAlter_DropColumns() {
+    SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+    SqlNodeList columnsToDrop = createStringList("col_to_drop_1", 
"col_to_drop_2");
+
+    SqlAlterTable alterTable =
+        new SqlAlterTable(POS, null, tableName, null, columnsToDrop, null, 
null, null, null);
+
+    String expectedSql = "ALTER TABLE `test_table` DROP COLUMNS 
(`col_to_drop_1`, `col_to_drop_2`)";
+    assertEquals(expectedSql, toSql(alterTable));
+  }
+
+  @Test
+  public void testUnparseAlter_AddColumnsAndDropColumns() {
+    SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+    // Setup Add
+    Field col1 = Field.of("new_col", 
Schema.FieldType.BOOLEAN).withNullable(true);
+    List<Field> columnsToAdd = Arrays.asList(col1);
+
+    // Setup Drop
+    SqlNodeList columnsToDrop = createStringList("col_to_drop");
+
+    SqlAlterTable alterTable =
+        new SqlAlterTable(
+            POS, null, tableName, columnsToAdd, columnsToDrop, null, null, 
null, null);
+
+    // unparses DROP before ADD
+    String expectedSql =
+        "ALTER TABLE `test_table` "
+            + "DROP COLUMNS (`col_to_drop`) "
+            + "ADD COLUMNS (`new_col` BOOLEAN)";
+
+    assertEquals(expectedSql, toSql(alterTable));
+  }
+
+  @Test
+  public void testUnparseAlter_AddAndDropPartitions() {
+    SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+    SqlNodeList partsToAdd = createStringList("p1", "p2");
+    SqlNodeList partsToDrop = createStringList("p3");
+
+    SqlAlterTable alterTable =
+        new SqlAlterTable(POS, null, tableName, null, null, partsToAdd, 
partsToDrop, null, null);
+
+    // unparses DROP before ADD
+    String expectedSql =
+        "ALTER TABLE `test_table` DROP PARTITIONS (`p3`) ADD PARTITIONS (`p1`, 
`p2`)";
+
+    assertEquals(expectedSql, toSql(alterTable));
+  }
+
+  @Test
+  public void testUnparseAlter_TableProperties() {
+    SqlNode tableName = new SqlIdentifier("test_table", POS);
+
+    SqlNodeList setProps = new SqlNodeList(POS);
+    setProps.add(createPropertyPair("prop1", "val1"));
+
+    SqlNodeList resetProps = createStringList("prop2");
+
+    SqlAlterTable alterTable =
+        new SqlAlterTable(POS, null, tableName, null, null, null, null, 
setProps, resetProps);
+
+    // unparses RESET before SET
+    String expectedSql = "ALTER TABLE `test_table` RESET ('prop2') SET 
('prop1' = 'val1')";
+
+    assertEquals(expectedSql, toSql(alterTable));
+  }
+
+  @Test
+  public void testUnparseAlter_AllOperations() {
+    // A comprehensive test combining all clauses to verify strict ordering
+    SqlNode tableName = new SqlIdentifier("full_table", POS);
+
+    List<Field> addCols = Collections.singletonList(Field.of("c1", 
Schema.FieldType.BOOLEAN));
+    SqlNodeList dropCols = createStringList("c_old");
+    SqlNodeList addParts = createStringList("p_new");
+    SqlNodeList dropParts = createStringList("p_old");
+    SqlNodeList setProps = new SqlNodeList(POS);
+    setProps.add(createPropertyPair("k", "v"));
+    SqlNodeList resetProps = createStringList("k_reset");
+
+    SqlAlterTable alterTable =
+        new SqlAlterTable(
+            POS, null, tableName, addCols, dropCols, addParts, dropParts, 
setProps, resetProps);
+
+    // Expected Order based on source code:
+    // 1. DROP COLUMNS
+    // 2. ADD COLUMNS
+    // 3. DROP PARTITIONS
+    // 4. ADD PARTITIONS
+    // 5. RESET
+    // 6. SET
+    String expectedSql =
+        "ALTER TABLE `full_table` "
+            + "DROP COLUMNS (`c_old`) "
+            + "ADD COLUMNS (`c1` BOOLEAN NOT NULL) "
+            + "DROP PARTITIONS (`p_old`) "
+            + "ADD PARTITIONS (`p_new`) "
+            + "RESET ('k_reset') "
+            + "SET ('k' = 'v')";
+
+    assertEquals(expectedSql, toSql(alterTable));
+  }
+
+  /** Helper to execute the unparse mechanism using a PrettyWriter. */
+  private String toSql(SqlAlterTable node) {
+    SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
+    writer.setAlwaysUseParentheses(false);
+    writer.setSelectListItemsOnSeparateLines(false);
+    writer.setIndentation(0);
+    node.unparse(writer, 0, 0);
+    return writer.toSqlString().getSql();
+  }
+
+  /**
+   * Helper to create a list of string literals. Useful for Drop Columns, 
Add/Drop Partitions, Reset
+   * Props.
+   */
+  private SqlNodeList createStringList(String... values) {
+    SqlNodeList list = new SqlNodeList(POS);
+    for (String val : values) {
+      list.add(SqlLiteral.createCharString(val, POS));
+    }
+    return list;
+  }
+
+  /** Helper to create Key=Value pair for Set Properties. */
+  private SqlNodeList createPropertyPair(String key, String value) {
+    SqlNodeList pair = new SqlNodeList(POS);
+    pair.add(SqlLiteral.createCharString(key, POS));
+    pair.add(SqlLiteral.createCharString(value, POS));
+    return pair;
+  }
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
index 7603e2c6259..748dd319c07 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.iceberg;
 
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,18 +28,23 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.UpdatePartitionSpec;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.types.Type;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.checkerframework.dataflow.qual.Pure;
@@ -143,7 +149,10 @@ public abstract class IcebergCatalogConfig implements 
Serializable {
   }
 
   public void createTable(
-      String tableIdentifier, Schema tableSchema, @Nullable List<String> 
partitionFields) {
+      String tableIdentifier,
+      Schema tableSchema,
+      @Nullable List<String> partitionFields,
+      Map<String, String> properties) {
     TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
     org.apache.iceberg.Schema icebergSchema = 
IcebergUtils.beamSchemaToIcebergSchema(tableSchema);
     PartitionSpec icebergSpec = 
PartitionUtils.toPartitionSpec(partitionFields, tableSchema);
@@ -153,7 +162,7 @@ public abstract class IcebergCatalogConfig implements 
Serializable {
           icebergIdentifier,
           icebergSchema,
           icebergSpec);
-      catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec);
+      catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, 
properties);
       LOG.info("Successfully created table '{}'.", icebergIdentifier);
     } catch (AlreadyExistsException e) {
       throw new TableAlreadyExistsException(e);
@@ -164,28 +173,92 @@ public abstract class IcebergCatalogConfig implements 
Serializable {
     TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
     try {
       Table table = catalog().loadTable(icebergIdentifier);
-      return IcebergTableInfo.create(
-          tableIdentifier,
-          IcebergUtils.icebergSchemaToBeamSchema(table.schema()),
-          table.properties());
+      return new IcebergTableInfo(tableIdentifier, table);
     } catch (NoSuchTableException ignored) {
       return null;
     }
   }
 
   // Helper class to pass information to Beam SQL module without relying on 
Iceberg deps
-  @AutoValue
-  public abstract static class IcebergTableInfo {
-    public abstract String getIdentifier();
+  public static class IcebergTableInfo {
+    private final String identifier;
+    private final Table table;
 
-    public abstract Schema getSchema();
+    IcebergTableInfo(String identifier, Table table) {
+      this.identifier = identifier;
+      this.table = table;
+    }
+
+    public String getIdentifier() {
+      return identifier;
+    }
+
+    public Schema getSchema() {
+      return IcebergUtils.icebergSchemaToBeamSchema(table.schema());
+    }
+
+    public Map<String, String> getProperties() {
+      return table.properties();
+    }
+
+    public void updateTableProps(Map<String, String> setProps, List<String> 
resetProps) {
+      if (setProps.isEmpty() && resetProps.isEmpty()) {
+        return;
+      }
+
+      UpdateProperties update = table.updateProperties();
+      resetProps.forEach(update::remove);
+      setProps.forEach(update::set);
+
+      update.commit();
+    }
 
-    public abstract Map<String, String> getProperties();
+    public void updateSchema(List<Schema.Field> columnsToAdd, 
Collection<String> columnsToDrop) {
+      if (columnsToAdd.isEmpty() && columnsToDrop.isEmpty()) {
+        return;
+      }
+      UpdateSchema update = table.updateSchema();
+      ImmutableList.Builder<String> requiredColumns = ImmutableList.builder();
+
+      for (Schema.Field col : columnsToAdd) {
+        String name = col.getName();
+        Type type = 
IcebergUtils.beamFieldTypeToIcebergFieldType(col.getType(), 0).type;
+        String desc = col.getDescription();
+
+        if (col.getType().getNullable()) {
+          if (desc.isEmpty()) {
+            update.addColumn(name, type);
+          } else {
+            update.addColumn(name, type, desc);
+          }
+        } else {
+          requiredColumns.add(name);
+        }
+      }
+      if (!requiredColumns.build().isEmpty()) {
+        throw new UnsupportedOperationException(
+            "Adding required columns is not yet supported. "
+                + "Encountered required columns: "
+                + requiredColumns.build());
+      }
+
+      columnsToDrop.forEach(update::deleteColumn);
 
-    static IcebergTableInfo create(
-        String identifier, Schema schema, Map<String, String> properties) {
-      return new AutoValue_IcebergCatalogConfig_IcebergTableInfo(identifier, 
schema, properties);
-    };
+      update.commit();
+    }
+
+    public void updatePartitionSpec(
+        List<String> partitionsToAdd, Collection<String> partitionsToDrop) {
+      if (partitionsToAdd.isEmpty() && partitionsToDrop.isEmpty()) {
+        return;
+      }
+      UpdatePartitionSpec update = table.updateSpec();
+
+      
partitionsToDrop.stream().map(PartitionUtils::toIcebergTerm).forEach(update::removeField);
+      
partitionsToAdd.stream().map(PartitionUtils::toIcebergTerm).forEach(update::addField);
+
+      update.commit();
+    }
   }
 
   public boolean dropTable(String tableIdentifier) {
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
index 4b94663c64c..2b3117f8bf8 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
@@ -22,11 +22,14 @@ import static 
org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Term;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 class PartitionUtils {
@@ -90,4 +93,38 @@ class PartitionUtils {
 
     return builder.build();
   }
+
+  private static final Map<Pattern, Function<Matcher, Term>> TERMS =
+      ImmutableMap.of(
+          HOUR,
+          matcher -> Expressions.hour(checkStateNotNull(matcher.group(1))),
+          DAY,
+          matcher -> Expressions.day(checkStateNotNull(matcher.group(1))),
+          MONTH,
+          matcher -> Expressions.month(checkStateNotNull(matcher.group(1))),
+          YEAR,
+          matcher -> Expressions.year(checkStateNotNull(matcher.group(1))),
+          TRUNCATE,
+          matcher ->
+              Expressions.truncate(
+                  checkStateNotNull(matcher.group(1)),
+                  Integer.parseInt(checkStateNotNull(matcher.group(2)))),
+          BUCKET,
+          matcher ->
+              Expressions.bucket(
+                  checkStateNotNull(matcher.group(1)),
+                  Integer.parseInt(checkStateNotNull(matcher.group(2)))),
+          IDENTITY,
+          matcher -> Expressions.ref(checkStateNotNull(matcher.group(1))));
+
+  static Term toIcebergTerm(String field) {
+    for (Map.Entry<Pattern, Function<Matcher, Term>> entry : TERMS.entrySet()) 
{
+      Matcher matcher = entry.getKey().matcher(field);
+      if (matcher.find()) {
+        return entry.getValue().apply(matcher);
+      }
+    }
+
+    throw new IllegalArgumentException("Could not find a partition term for '" 
+ field + "'.");
+  }
 }
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
index 893e24b6155..591467ce0d0 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
@@ -823,14 +823,8 @@ public class FilterUtilsTest {
 
     ImmutableSet<Operation> inOperations = ImmutableSet.of(Operation.IN, 
Operation.NOT_IN);
     if (inOperations.contains(expected.op())) {
-      System.out.printf(
-          "xxx op: %s, literals: %s, ref: %s%n",
-          expected.op(), expected.literals(), expected.ref().name());
       assertEquals(expected.literals(), actual.literals());
     } else {
-      System.out.printf(
-          "xxx op: %s, literal: %s, ref: %s%n",
-          expected.op(), expected.literal(), expected.ref().name());
       assertEquals(expected.literal(), actual.literal());
     }
   }

Reply via email to