This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fbd0f5c634 [IcebergIO] Integrate with Beam SQL (#34799)
1fbd0f5c634 is described below

commit 1fbd0f5c634160cc9eeded1c43b8f41012ed404c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Jun 12 13:56:38 2025 -0400

    [IcebergIO] Integrate with Beam SQL (#34799)
    
    * add Iceberg table provider and tests
    
    * properties go in the tableprovider initialization
    
    * tobuilder
    
    * streaming integration test
    
    * spotless
    
    * extend test to include multi nested types; fix iceberg <-> conversion 
logic
    
    * add to changes.md
    
    * spotless
    
    * fix tests
    
    * clean
    
    * update CHANGES
    
    * add projection pushdown and column pruning
    
    * spotless
    
    * fixes
    
    * fixes
    
    * updates
    
    * sync with HEAD and use new Catalog implementation
    
    * mark new interfaces @internal
    
    * spotless
    
    * fix unparse method
---
 .../IO_Iceberg_Integration_Tests.json              |   2 +-
 .../beam_PostCommit_Python_Xlang_IO_Direct.json    |   2 +-
 .github/trigger_files/beam_PostCommit_SQL.json     |   2 +-
 CHANGES.md                                         |   2 +
 sdks/java/extensions/sql/build.gradle              |   4 +
 .../sql/impl/parser/SqlCreateCatalog.java          |  18 +-
 .../sdk/extensions/sql/meta/catalog/Catalog.java   |   2 +
 .../sql/meta/catalog/CatalogManager.java           |   2 +
 .../sql/meta/catalog/CatalogRegistrar.java         |   3 +
 .../sql/meta/catalog/InMemoryCatalog.java          |   7 +-
 .../sql/meta/catalog/InMemoryCatalogRegistrar.java |   6 +-
 .../iceberg/IcebergCatalog.java}                   |  18 +-
 .../sql/meta/provider/iceberg/IcebergFilter.java   | 144 ++++++++++
 .../sql/meta/provider/iceberg/IcebergTable.java    | 176 +++++++++++++
 .../provider/iceberg/IcebergTableProvider.java     |  62 +++++
 .../iceberg/package-info.java}                     |  11 +-
 .../sql/meta/store/InMemoryMetaStore.java          |   4 +
 .../beam/sdk/extensions/sql/PubsubToIcebergIT.java | 267 +++++++++++++++++++
 .../meta/provider/iceberg/IcebergFilterTest.java   | 121 +++++++++
 .../meta/provider/iceberg/IcebergReadWriteIT.java  | 289 +++++++++++++++++++++
 .../provider/iceberg/IcebergTableProviderTest.java |  83 ++++++
 sdks/java/io/expansion-service/build.gradle        |   7 +-
 sdks/java/io/iceberg/build.gradle                  |   4 +-
 .../apache/beam/sdk/io/iceberg/FilterUtils.java    |  28 +-
 .../beam/sdk/io/iceberg/IcebergCatalogConfig.java  |   2 +
 .../apache/beam/sdk/io/iceberg/ScanTaskReader.java |   1 -
 26 files changed, 1216 insertions(+), 51 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 34a6e02150e..b73af5e61a4 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-    "modification": 4
+    "modification": 1
 }
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
index e0266d62f2e..f1ba03a243e 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 4
+  "modification": 5
 }
diff --git a/.github/trigger_files/beam_PostCommit_SQL.json 
b/.github/trigger_files/beam_PostCommit_SQL.json
index e3d6056a5de..b2683333323 100644
--- a/.github/trigger_files/beam_PostCommit_SQL.json
+++ b/.github/trigger_files/beam_PostCommit_SQL.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 1
+  "modification": 2
 }
diff --git a/CHANGES.md b/CHANGES.md
index f913302cb9d..a7f96fc3fdc 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -75,10 +75,12 @@
 ## I/Os
 
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* [IcebergIO] Now available with Beam SQL! 
([#34799](https://github.com/apache/beam/pull/34799))
 * [IcebergIO] Support reading with column pruning 
([#34856](https://github.com/apache/beam/pull/34856))
 * [IcebergIO] Support reading with pushdown filtering 
([#34827](https://github.com/apache/beam/pull/34827))
 
 ## New Features / Improvements
+* [Beam SQL] Introducing Beam Catalogs 
([#35223](https://github.com/apache/beam/pull/35223))
 * Adding Google Storage Requests Pays feature 
(Golang)([#30747](https://github.com/apache/beam/issues/30747)).
 * X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * [Python] Prism runner now auto-enabled for some Python pipelines using the 
direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
diff --git a/sdks/java/extensions/sql/build.gradle 
b/sdks/java/extensions/sql/build.gradle
index 6f34891c2d3..a73bd2518fe 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -74,6 +74,10 @@ dependencies {
   fmppTask "org.freemarker:freemarker:2.3.31"
   fmppTemplates library.java.vendored_calcite_1_28_0
   implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation project(":sdks:java:managed")
+  implementation project(":sdks:java:io:iceberg")
+  runtimeOnly project(":sdks:java:io:iceberg:bqms")
+  runtimeOnly project(":sdks:java:io:iceberg:hive")
   implementation project(":sdks:java:extensions:avro")
   implementation project(":sdks:java:extensions:join-library")
   permitUnusedDeclared project(":sdks:java:extensions:join-library") // 
BEAM-11761
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateCatalog.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateCatalog.java
index 7265c3103c1..c1d96eea7ba 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateCatalog.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateCatalog.java
@@ -99,16 +99,16 @@ public class SqlCreateCatalog extends SqlCreate implements 
BeamSqlParser.Executa
         if (i > 0) {
           writer.keyword(",");
         }
-        properties.get(i).unparse(writer, leftPrec, rightPrec);
-      }
-
-      for (int i = 0; i < properties.size(); i += 2) {
-        if (i > 0) {
-          writer.keyword(",");
-        }
-        properties.get(i).unparse(writer, leftPrec, rightPrec); // key
+        SqlNode property = properties.get(i);
+        checkState(
+            property instanceof SqlNodeList,
+            String.format(
+                "Unexpected properties entry '%s' of class '%s'", property, 
property.getClass()));
+        SqlNodeList kv = ((SqlNodeList) property);
+
+        kv.get(0).unparse(writer, leftPrec, rightPrec); // key
         writer.keyword("=");
-        properties.get(i + 1).unparse(writer, leftPrec, rightPrec); // value
+        kv.get(1).unparse(writer, leftPrec, rightPrec); // value
       }
       writer.keyword(")");
     }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
index 08ca3488c1c..2a99209e06f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.meta.catalog;
 
 import java.util.Map;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
 
 /**
@@ -25,6 +26,7 @@ import 
org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
  * configuration properties. Uses an underlying {@link MetaStore} to manage 
tables and table
  * providers.
  */
+@Internal
 public interface Catalog {
   /** A type that defines this catalog. */
   String type();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogManager.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogManager.java
index 2ac5266219b..4654f0dd1b0 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogManager.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogManager.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.meta.catalog;
 
 import java.util.Map;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -32,6 +33,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * <p>When {@link #registerTableProvider(String, TableProvider)} is called, 
the provider should
  * become available for all catalogs.
  */
+@Internal
 public interface CatalogManager {
   /** Creates and stores a catalog of a particular type. */
   void createCatalog(String name, String type, Map<String, String> properties);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java
index 3fad539e8cc..08fe6179b1f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.catalog;
 
+import org.apache.beam.sdk.annotations.Internal;
+
 /**
  * Over-arching registrar to capture available {@link Catalog}s. 
Implementations should be marked
  * with {@link com.google.auto.service.AutoService} to be available to {@link
  * java.util.ServiceLoader}s.
  */
+@Internal
 public interface CatalogRegistrar {
   Iterable<Class<? extends Catalog>> getCatalogs();
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
index 4ff9811605a..8757c235786 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
@@ -17,17 +17,15 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.catalog;
 
-import com.google.auto.service.AutoService;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
 import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
 import org.apache.beam.sdk.util.Preconditions;
 
-@AutoService(Catalog.class)
 public class InMemoryCatalog implements Catalog {
   private final String name;
   private final Map<String, String> properties;
-  private final InMemoryMetaStore metaStore = new InMemoryMetaStore();
+  protected final InMemoryMetaStore metaStore = new InMemoryMetaStore();
 
   public InMemoryCatalog(String name, Map<String, String> properties) {
     this.name = name;
@@ -41,7 +39,8 @@ public class InMemoryCatalog implements Catalog {
 
   @Override
   public String name() {
-    return Preconditions.checkStateNotNull(name, "InMemoryCatalog has not been 
initialized");
+    return Preconditions.checkStateNotNull(
+        name, getClass().getSimpleName() + " has not been initialized");
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java
index df53818823c..2d94e19c168 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java
@@ -18,12 +18,16 @@
 package org.apache.beam.sdk.extensions.sql.meta.catalog;
 
 import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.sql.meta.provider.iceberg.IcebergCatalog;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 
 @AutoService(CatalogRegistrar.class)
 public class InMemoryCatalogRegistrar implements CatalogRegistrar {
   @Override
   public Iterable<Class<? extends Catalog>> getCatalogs() {
-    return ImmutableList.of(InMemoryCatalog.class);
+    return ImmutableList.<Class<? extends Catalog>>builder()
+        .add(InMemoryCatalog.class)
+        .add(IcebergCatalog.class)
+        .build();
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
similarity index 65%
copy from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java
copy to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
index df53818823c..85fd6f4efc1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
@@ -15,15 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.catalog;
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
 
-import com.google.auto.service.AutoService;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;
+
+public class IcebergCatalog extends InMemoryCatalog {
+  public IcebergCatalog(String name, Map<String, String> properties) {
+    super(name, properties);
+    metaStore.registerProvider(new IcebergTableProvider(name, properties));
+  }
 
-@AutoService(CatalogRegistrar.class)
-public class InMemoryCatalogRegistrar implements CatalogRegistrar {
   @Override
-  public Iterable<Class<? extends Catalog>> getCatalogs() {
-    return ImmutableList.of(InMemoryCatalog.class);
+  public String type() {
+    return "iceberg";
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java
new file mode 100644
index 00000000000..b3854ced46c
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import static org.apache.beam.sdk.io.iceberg.FilterUtils.SUPPORTED_OPS;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind.AND;
+import static 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind.OR;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class IcebergFilter implements BeamSqlTableFilter {
+  private @Nullable List<RexNode> supported;
+  private @Nullable List<RexNode> unsupported;
+  private final List<RexNode> predicateCNF;
+
+  public IcebergFilter(List<RexNode> predicateCNF) {
+    this.predicateCNF = predicateCNF;
+  }
+
+  private void maybeInitialize() {
+    if (supported != null && unsupported != null) {
+      return;
+    }
+    ImmutableList.Builder<RexNode> supportedBuilder = ImmutableList.builder();
+    ImmutableList.Builder<RexNode> unsupportedBuilder = 
ImmutableList.builder();
+    for (RexNode node : predicateCNF) {
+      if (!node.getType().getSqlTypeName().equals(SqlTypeName.BOOLEAN)) {
+        throw new IllegalArgumentException(
+            "Predicate node '"
+                + node.getClass().getSimpleName()
+                + "' should be a boolean expression, but was: "
+                + node.getType().getSqlTypeName());
+      }
+
+      if (isSupported(node).getLeft()) {
+        supportedBuilder.add(node);
+      } else {
+        unsupportedBuilder.add(node);
+      }
+    }
+    supported = supportedBuilder.build();
+    unsupported = unsupportedBuilder.build();
+  }
+
+  @Override
+  public List<RexNode> getNotSupported() {
+    maybeInitialize();
+    return checkStateNotNull(unsupported);
+  }
+
+  @Override
+  public int numSupported() {
+    maybeInitialize();
+    return 
BeamSqlTableFilter.expressionsInFilter(checkStateNotNull(supported));
+  }
+
+  public List<RexNode> getSupported() {
+    maybeInitialize();
+    return checkStateNotNull(supported);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(IcebergFilter.class)
+        .add(
+            "supported",
+            checkStateNotNull(supported).stream()
+                .map(RexNode::toString)
+                .collect(Collectors.joining()))
+        .add(
+            "unsupported",
+            checkStateNotNull(unsupported).stream()
+                .map(RexNode::toString)
+                .collect(Collectors.joining()))
+        .toString();
+  }
+
+  /**
+   * Check whether a {@code RexNode} is supported. As of right now Iceberg 
supports: 1. Complex
+   * predicates (both conjunction and disjunction). 2. Comparison between a 
column and a literal.
+   *
+   * @param node A node to check for predicate push-down support.
+   * @return A pair containing a boolean whether an expression is supported 
and the number of input
+   *     references used by the expression.
+   */
+  private Pair<Boolean, Integer> isSupported(RexNode node) {
+    int numberOfInputRefs = 0;
+    boolean isSupported = true;
+
+    if (node instanceof RexCall) {
+      RexCall compositeNode = (RexCall) node;
+      if (!SUPPORTED_OPS.contains(node.getKind())) {
+        isSupported = false;
+      } else {
+        for (RexNode operand : compositeNode.getOperands()) {
+          // All operands must be supported for a parent node to be supported.
+          Pair<Boolean, Integer> childSupported = isSupported(operand);
+          if (!node.getKind().belongsTo(ImmutableSet.of(AND, OR))) {
+            numberOfInputRefs += childSupported.getRight();
+          }
+          // Predicate functions with multiple columns are unsupported.
+          isSupported = numberOfInputRefs < 2 && childSupported.getLeft();
+        }
+      }
+    } else if (node instanceof RexInputRef) {
+      numberOfInputRefs = 1;
+    } else if (node instanceof RexLiteral) {
+      // RexLiterals are expected, but no action is needed.
+    } else {
+      throw new UnsupportedOperationException(
+          "Encountered an unexpected node type: " + 
node.getClass().getSimpleName());
+    }
+
+    return Pair.of(isSupported, numberOfInputRefs);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java
new file mode 100644
index 00000000000..9a87edff2a2
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.IntFunction;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamSqlUnparseContext;
+import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.rel2sql.SqlImplementor;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.dialect.BigQuerySqlDialect;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class IcebergTable extends SchemaBaseBeamTable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergTable.class);
+  @VisibleForTesting static final String CATALOG_PROPERTIES_FIELD = 
"catalog_properties";
+  @VisibleForTesting static final String HADOOP_CONFIG_PROPERTIES_FIELD = 
"config_properties";
+  @VisibleForTesting static final String CATALOG_NAME_FIELD = "catalog_name";
+
+  @VisibleForTesting
+  static final String TRIGGERING_FREQUENCY_FIELD = 
"triggering_frequency_seconds";
+
+  @VisibleForTesting final String tableIdentifier;
+  @VisibleForTesting final IcebergCatalogConfig catalogConfig;
+  @VisibleForTesting @Nullable Integer triggeringFrequency;
+
+  IcebergTable(Table table, IcebergCatalogConfig catalogConfig) {
+    super(table.getSchema());
+    this.schema = table.getSchema();
+    this.tableIdentifier = checkArgumentNotNull(table.getLocation());
+    this.catalogConfig = catalogConfig;
+    ObjectNode properties = table.getProperties();
+    if (properties.has(TRIGGERING_FREQUENCY_FIELD)) {
+      this.triggeringFrequency = 
properties.get(TRIGGERING_FREQUENCY_FIELD).asInt();
+    }
+  }
+
+  @Override
+  public POutput buildIOWriter(PCollection<Row> input) {
+    ImmutableMap.Builder<String, Object> configBuilder = 
ImmutableMap.builder();
+    configBuilder.putAll(getBaseConfig());
+    if (triggeringFrequency != null) {
+      configBuilder.put(TRIGGERING_FREQUENCY_FIELD, triggeringFrequency);
+    }
+    return 
input.apply(Managed.write(Managed.ICEBERG).withConfig(configBuilder.build()));
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    return begin
+        .apply(Managed.read(Managed.ICEBERG).withConfig(getBaseConfig()))
+        .getSinglePCollection();
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filter, List<String> fieldNames) {
+
+    Map<String, Object> readConfig = new HashMap<>(getBaseConfig());
+
+    if (!(filter instanceof DefaultTableFilter)) {
+      IcebergFilter icebergFilter = (IcebergFilter) filter;
+      if (!icebergFilter.getSupported().isEmpty()) {
+        String expression = generateFilterExpression(getSchema(), 
icebergFilter.getSupported());
+        if (!expression.isEmpty()) {
+          LOG.info("Pushing down the following filter: {}", expression);
+          readConfig.put("filter", expression);
+        }
+      }
+    }
+
+    if (!fieldNames.isEmpty()) {
+      readConfig.put("keep", fieldNames);
+    }
+
+    return begin
+        .apply("Read Iceberg with push-down", 
Managed.read(Managed.ICEBERG).withConfig(readConfig))
+        .getSinglePCollection();
+  }
+
+  @Override
+  public ProjectSupport supportsProjects() {
+    return ProjectSupport.WITHOUT_FIELD_REORDERING;
+  }
+
+  @Override
+  public BeamSqlTableFilter constructFilter(List<RexNode> filter) {
+    return new IcebergFilter(filter);
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  private Map<String, Object> getBaseConfig() {
+    ImmutableMap.Builder<String, Object> managedConfigBuilder = 
ImmutableMap.builder();
+    managedConfigBuilder.put("table", tableIdentifier);
+    @Nullable String name = catalogConfig.getCatalogName();
+    @Nullable Map<String, String> catalogProps = 
catalogConfig.getCatalogProperties();
+    @Nullable Map<String, String> hadoopConfProps = 
catalogConfig.getConfigProperties();
+    if (name != null) {
+      managedConfigBuilder.put(CATALOG_NAME_FIELD, name);
+    }
+    if (catalogProps != null) {
+      managedConfigBuilder.put(CATALOG_PROPERTIES_FIELD, catalogProps);
+    }
+    if (hadoopConfProps != null) {
+      managedConfigBuilder.put(HADOOP_CONFIG_PROPERTIES_FIELD, 
hadoopConfProps);
+    }
+    return managedConfigBuilder.build();
+  }
+
+  private String generateFilterExpression(Schema schema, List<RexNode> 
supported) {
+    final IntFunction<SqlNode> field =
+        i -> new SqlIdentifier(schema.getField(i).getName(), 
SqlParserPos.ZERO);
+
+    SqlImplementor.Context context = new BeamSqlUnparseContext(field);
+
+    // Create a single SqlNode from a list of RexNodes
+    SqlNode andSqlNode = null;
+    for (RexNode node : supported) {
+      SqlNode sqlNode = context.toSql(null, node);
+      if (andSqlNode == null) {
+        andSqlNode = sqlNode;
+        continue;
+      }
+      // AND operator must have exactly 2 operands.
+      andSqlNode =
+          SqlStdOperatorTable.AND.createCall(
+              SqlParserPos.ZERO, ImmutableList.of(andSqlNode, sqlNode));
+    }
+    return 
checkStateNotNull(andSqlNode).toSqlString(BigQuerySqlDialect.DEFAULT).getSql();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java
new file mode 100644
index 00000000000..38acc65d6d7
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class IcebergTableProvider extends InMemoryMetaTableProvider {
+  private static final String BEAM_HADOOP_PREFIX = "beam.catalog.%s.hadoop";
+  @VisibleForTesting final IcebergCatalogConfig catalogConfig;
+
+  public IcebergTableProvider(String name, Map<String, String> properties) {
+    ImmutableMap.Builder<String, String> catalogProps = ImmutableMap.builder();
+    ImmutableMap.Builder<String, String> hadoopProps = ImmutableMap.builder();
+    String hadoopPrefix = String.format(BEAM_HADOOP_PREFIX, name);
+
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      if (entry.getKey().startsWith(hadoopPrefix)) {
+        hadoopProps.put(entry.getKey(), entry.getValue());
+      } else {
+        catalogProps.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    catalogConfig =
+        IcebergCatalogConfig.builder()
+            .setCatalogName(name)
+            .setCatalogProperties(catalogProps.build())
+            .setConfigProperties(hadoopProps.build())
+            .build();
+  }
+
+  @Override
+  public String getTableType() {
+    return "iceberg";
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    return new IcebergTable(table, catalogConfig);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java
similarity index 69%
copy from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java
copy to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java
index 3fad539e8cc..01a34920f58 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java
@@ -15,13 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.catalog;
 
-/**
- * Over-arching registrar to capture available {@link Catalog}s. 
Implementations should be marked
- * with {@link com.google.auto.service.AutoService} to be available to {@link
- * java.util.ServiceLoader}s.
- */
-public interface CatalogRegistrar {
-  Iterable<Class<? extends Catalog>> getCatalogs();
-}
+/** Table schema for Iceberg. */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
index 8a892cc2eb7..a571a5d2dbb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
@@ -113,4 +113,8 @@ public class InMemoryMetaStore implements MetaStore {
   Map<String, TableProvider> getProviders() {
     return providers;
   }
+
+  public boolean hasProvider(TableProvider provider) {
+    return providers.containsKey(provider.getTableType());
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java
new file mode 100644
index 00000000000..59f3b7650c7
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests stream writing to Iceberg with Beam SQL. */
+@RunWith(JUnit4.class)
+public class PubsubToIcebergIT implements Serializable {
+  private static final Schema SOURCE_SCHEMA =
+      Schema.builder().addNullableField("id", INT64).addNullableField("name", 
STRING).build();
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient TestPubsub pubsub = TestPubsub.create();
+  private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("PubsubToIcebergIT");
+  static final String DATASET = "sql_pubsub_to_iceberg_it_" + 
System.nanoTime();
+  static String warehouse;
+  protected static final GcpOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class);
+  @Rule public TestName testName = new TestName();
+
+  @BeforeClass
+  public static void createDataset() throws IOException, InterruptedException {
+    warehouse =
+        String.format(
+            "%s%s/%s",
+            TestPipeline.testingPipelineOptions().getTempLocation(),
+            PubsubToIcebergIT.class.getSimpleName(),
+            UUID.randomUUID());
+    BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET);
+    createCatalogDdl =
+        "CREATE CATALOG my_catalog \n"
+            + "TYPE iceberg \n"
+            + "PROPERTIES (\n"
+            + "  'catalog-impl' = 
'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', \n"
+            + "  'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', \n"
+            + format("  'warehouse' = '%s', \n", warehouse)
+            + format("  'gcp_project' = '%s', \n", OPTIONS.getProject())
+            + "  'gcp_region' = 'us-central1')";
+    setCatalogDdl = "SET CATALOG my_catalog";
+  }
+
+  private String tableIdentifier;
+  private static String createCatalogDdl;
+  private static String setCatalogDdl;
+
+  @Before
+  public void setup() {
+    tableIdentifier = DATASET + "." + testName.getMethodName();
+  }
+
+  @AfterClass
+  public static void deleteDataset() {
+    BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET);
+  }
+
+  @Test
+  public void testSimpleInsert() throws Exception {
+    String pubsubTableString =
+        "CREATE EXTERNAL TABLE pubsub_topic (\n"
+            + "event_timestamp TIMESTAMP, \n"
+            + "attributes MAP<VARCHAR, VARCHAR>, \n"
+            + "payload ROW< \n"
+            + "             id BIGINT, \n"
+            + "             name VARCHAR \n"
+            + "           > \n"
+            + ") \n"
+            + "TYPE 'pubsub' \n"
+            + "LOCATION '"
+            + pubsub.topicPath()
+            + "' \n"
+            + "TBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'";
+    String icebergTableString =
+        "CREATE EXTERNAL TABLE iceberg_table( \n"
+            + "   id BIGINT, \n"
+            + "   name VARCHAR \n "
+            + ") \n"
+            + "TYPE 'iceberg' \n"
+            + "LOCATION '"
+            + tableIdentifier
+            + "' \n"
+            + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'";
+    String insertStatement =
+        "INSERT INTO iceberg_table \n"
+            + "SELECT \n"
+            + "  pubsub_topic.payload.id, \n"
+            + "  pubsub_topic.payload.name \n"
+            + "FROM pubsub_topic";
+    pipeline.apply(
+        SqlTransform.query(insertStatement)
+            .withDdlString(createCatalogDdl)
+            .withDdlString(setCatalogDdl)
+            .withDdlString(pubsubTableString)
+            .withDdlString(icebergTableString));
+    pipeline.run();
+
+    // Block until a subscription for this topic exists
+    pubsub.assertSubscriptionEventuallyCreated(
+        pipeline.getOptions().as(GcpOptions.class).getProject(), 
Duration.standardMinutes(5));
+
+    List<PubsubMessage> messages =
+        ImmutableList.of(
+            message(ts(1), 3, "foo"), message(ts(2), 5, "bar"), message(ts(3), 
7, "baz"));
+    pubsub.publish(messages);
+
+    validateRowsWritten();
+  }
+
+  @Test
+  public void testSimpleInsertFlat() throws Exception {
+    String pubsubTableString =
+        "CREATE EXTERNAL TABLE pubsub_topic (\n"
+            + "event_timestamp TIMESTAMP, \n"
+            + "id BIGINT, \n"
+            + "name VARCHAR \n"
+            + ") \n"
+            + "TYPE 'pubsub' \n"
+            + "LOCATION '"
+            + pubsub.topicPath()
+            + "' \n"
+            + "TBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'";
+    String bqTableString =
+        "CREATE EXTERNAL TABLE iceberg_table( \n"
+            + "   id BIGINT, \n"
+            + "   name VARCHAR \n "
+            + ") \n"
+            + "TYPE 'iceberg' \n"
+            + "LOCATION '"
+            + tableIdentifier
+            + "' \n"
+            + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'";
+    String insertStatement =
+        "INSERT INTO iceberg_table \n"
+            + "SELECT \n"
+            + "  id, \n"
+            + "  name \n"
+            + "FROM pubsub_topic";
+
+    pipeline.apply(
+        SqlTransform.query(insertStatement)
+            .withDdlString(createCatalogDdl)
+            .withDdlString(setCatalogDdl)
+            .withDdlString(pubsubTableString)
+            .withDdlString(bqTableString));
+    pipeline.run();
+
+    // Block until a subscription for this topic exists
+    pubsub.assertSubscriptionEventuallyCreated(
+        pipeline.getOptions().as(GcpOptions.class).getProject(), 
Duration.standardMinutes(5));
+
+    List<PubsubMessage> messages =
+        ImmutableList.of(
+            message(ts(1), 3, "foo"), message(ts(2), 5, "bar"), message(ts(3), 
7, "baz"));
+    pubsub.publish(messages);
+
+    validateRowsWritten();
+  }
+
+  private void validateRowsWritten() throws IOException, InterruptedException {
+    String query = String.format("SELECT * FROM `%s.%s`", 
OPTIONS.getProject(), tableIdentifier);
+    List<Row> expectedRows =
+        ImmutableList.of(
+            row(SOURCE_SCHEMA, 3L, "foo"),
+            row(SOURCE_SCHEMA, 5L, "bar"),
+            row(SOURCE_SCHEMA, 7L, "baz"));
+
+    BackOff backOff =
+        FluentBackoff.DEFAULT
+            .withInitialBackoff(Duration.standardSeconds(10))
+            .withMaxBackoff(Duration.standardSeconds(20))
+            .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+            .backoff();
+    Sleeper sleeper = Sleeper.DEFAULT;
+    do {
+      List<TableRow> returnedRows = new ArrayList<>();
+      try {
+        returnedRows = BQ_CLIENT.queryUnflattened(query, OPTIONS.getProject(), 
true, true);
+      } catch (GoogleJsonResponseException e) {
+        if (e.getStatusCode() != HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
+          throw new RuntimeException(e);
+        }
+      }
+      List<Row> beamRows =
+          returnedRows.stream()
+              .map(r -> BigQueryUtils.toBeamRow(SOURCE_SCHEMA, r))
+              .collect(Collectors.toList());
+      if (beamRows.containsAll(expectedRows)) {
+        return;
+      }
+    } while (BackOffUtils.next(sleeper, backOff));
+
+    throw new RuntimeException("Polled for 5 minutes and could not find all 
rows in table.");
+  }
+
+  private Row row(Schema schema, Object... values) {
+    return Row.withSchema(schema).addValues(values).build();
+  }
+
+  private PubsubMessage message(Instant timestamp, int id, String name) {
+    return message(timestamp, jsonString(id, name));
+  }
+
+  private PubsubMessage message(Instant timestamp, String jsonPayload) {
+    return new PubsubMessage(
+        jsonPayload.getBytes(UTF_8), ImmutableMap.of("ts", 
String.valueOf(timestamp.getMillis())));
+  }
+
+  private String jsonString(int id, String name) {
+    return "{ \"id\" : " + id + ", \"name\" : \"" + name + "\" }";
+  }
+
+  private Instant ts(long millis) {
+    return new Instant(millis);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java
new file mode 100644
index 00000000000..f14344b4f1f
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import static 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PUSH_DOWN_OPTION;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+
+import org.apache.beam.sdk.extensions.sql.TableUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PushDownOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for {@link IcebergFilter}. */
+@RunWith(JUnit4.class)
+public class IcebergFilterTest {
+  // TODO: add date, time, and datetime fields.
+  private static final Schema BASIC_SCHEMA =
+      Schema.builder()
+          .addInt32Field("unused1")
+          .addInt32Field("id")
+          .addStringField("name")
+          .addInt16Field("unused2")
+          .addBooleanField("b")
+          .build();
+
+  private BeamSqlEnv sqlEnv;
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void buildUp() {
+    TestTableProvider tableProvider = new TestTableProvider();
+    Table table = getTable("TEST", PushDownOptions.NONE);
+    tableProvider.createTable(table);
+    tableProvider.addRows(
+        table.getName(),
+        row(BASIC_SCHEMA, 100, 1, "one", (short) 100, true),
+        row(BASIC_SCHEMA, 200, 2, "two", (short) 200, false));
+
+    sqlEnv =
+        BeamSqlEnv.builder(tableProvider)
+            .setPipelineOptions(PipelineOptionsFactory.create())
+            .build();
+  }
+
+  @Test
+  public void testIsSupported() {
+    ImmutableList<Pair<String, Boolean>> sqlQueries =
+        ImmutableList.of(
+            Pair.of("select * from TEST where unused1=100", true),
+            Pair.of("select * from TEST where unused1 in (100, 200)", true),
+            Pair.of("select * from TEST where unused1+10=110", true),
+            Pair.of("select * from TEST where b", true),
+            Pair.of(
+                "select * from TEST where unused1>100 and unused1<=200 and 
id<>1 and (name='two' or id=2)",
+                true),
+            Pair.of("select * from TEST where unused2=200", true),
+            // Functions involving more than one column are not supported yet.
+            Pair.of("select * from TEST where unused1=unused2 and id=2", 
false));
+
+    for (Pair<String, Boolean> query : sqlQueries) {
+      String sql = query.getLeft();
+      Boolean isSupported = query.getRight();
+
+      BeamRelNode beamRelNode = sqlEnv.parseQuery(sql);
+      assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+      IcebergFilter filter =
+          new IcebergFilter(((BeamCalcRel) 
beamRelNode).getProgram().split().right);
+
+      assertThat(
+          "Query: '" + sql + "' is expected to be " + (isSupported ? 
"supported." : "unsupported."),
+          filter.getNotSupported().isEmpty() == isSupported);
+    }
+  }
+
+  private static Table getTable(String name, PushDownOptions options) {
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .schema(BASIC_SCHEMA)
+        .properties(
+            TableUtils.parseProperties(
+                "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + 
"\" }"))
+        .type("test")
+        .build();
+  }
+
+  private static Row row(Schema schema, Object... objects) {
+    return Row.withSchema(schema).addValues(objects).build();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java
new file mode 100644
index 00000000000..fd3c18b6072
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static 
org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.array;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.row;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests for writing to Iceberg with Beam SQL. */
+@RunWith(JUnit4.class)
+public class IcebergReadWriteIT {
+  private static final Schema NESTED_SCHEMA =
+      Schema.builder()
+          .addNullableArrayField("c_arr_struct_arr", STRING)
+          .addNullableInt32Field("c_arr_struct_integer")
+          .build();
+  private static final Schema SOURCE_SCHEMA =
+      Schema.builder()
+          .addNullableField("c_bigint", INT64)
+          .addNullableField("c_integer", INT32)
+          .addNullableField("c_float", FLOAT)
+          .addNullableField("c_double", DOUBLE)
+          .addNullableField("c_boolean", BOOLEAN)
+          .addNullableField("c_timestamp", CalciteUtils.TIMESTAMP)
+          .addNullableField("c_varchar", STRING)
+          .addNullableField("c_char", STRING)
+          .addNullableField("c_arr", array(STRING))
+          .addNullableField("c_arr_struct", array(row(NESTED_SCHEMA)))
+          .build();
+
+  @Rule public transient TestPipeline writePipeline = TestPipeline.create();
+  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
+  @Rule public TestName testName = new TestName();
+
+  private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("IcebergReadWriteIT");
+  static final String DATASET = "iceberg_sql_tests_" + System.nanoTime();
+  static String warehouse;
+  protected static final GcpOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class);
+
+  @BeforeClass
+  public static void createDataset() throws IOException, InterruptedException {
+    warehouse =
+        format(
+            "%s%s/%s",
+            TestPipeline.testingPipelineOptions().getTempLocation(),
+            IcebergReadWriteIT.class.getSimpleName(),
+            UUID.randomUUID());
+    BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET);
+  }
+
+  @AfterClass
+  public static void deleteDataset() {
+    BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET);
+  }
+
+  @Test
+  public void testSqlWriteAndRead() throws IOException, InterruptedException {
+    BeamSqlEnv sqlEnv =
+        BeamSqlEnv.builder(new InMemoryCatalogManager())
+            .setPipelineOptions(PipelineOptionsFactory.create())
+            .build();
+    String tableIdentifier = DATASET + "." + testName.getMethodName();
+
+    // 1) create Iceberg catalog
+    String createCatalog =
+        "CREATE CATALOG my_catalog \n"
+            + "TYPE iceberg \n"
+            + "PROPERTIES (\n"
+            + "  'catalog-impl' = 
'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', \n"
+            + "  'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', \n"
+            + format("  'warehouse' = '%s', \n", warehouse)
+            + format("  'gcp_project' = '%s', \n", OPTIONS.getProject())
+            + "  'gcp_region' = 'us-central1')";
+    sqlEnv.executeDdl(createCatalog);
+
+    // 2) use the catalog we just created
+    String setCatalog = "SET CATALOG my_catalog";
+    sqlEnv.executeDdl(setCatalog);
+
+    // 3) create beam table
+    String createTableStatement =
+        "CREATE EXTERNAL TABLE TEST( \n"
+            + "   c_bigint BIGINT, \n"
+            + "   c_integer INTEGER, \n"
+            + "   c_float FLOAT, \n"
+            + "   c_double DOUBLE, \n"
+            + "   c_boolean BOOLEAN, \n"
+            + "   c_timestamp TIMESTAMP, \n"
+            + "   c_varchar VARCHAR, \n "
+            + "   c_char CHAR, \n"
+            + "   c_arr ARRAY<VARCHAR>, \n"
+            + "   c_arr_struct ARRAY<ROW<c_arr_struct_arr ARRAY<VARCHAR>, 
c_arr_struct_integer INTEGER>> \n"
+            + ") \n"
+            + "TYPE 'iceberg' \n"
+            + "LOCATION '"
+            + tableIdentifier
+            + "'";
+    sqlEnv.executeDdl(createTableStatement);
+
+    // 3) write to underlying Iceberg table
+    String insertStatement =
+        "INSERT INTO TEST VALUES ("
+            + "9223372036854775807, "
+            + "2147483647, "
+            + "1.0, "
+            + "1.0, "
+            + "TRUE, "
+            + "TIMESTAMP '2018-05-28 20:17:40.123', "
+            + "'varchar', "
+            + "'char', "
+            + "ARRAY['123', '456'], "
+            + "ARRAY["
+            + "ROW(ARRAY['abc', 'xyz'], 123), "
+            + "ROW(ARRAY['foo', 'bar'], 456), "
+            + "ROW(ARRAY['cat', 'dog'], 789)]"
+            + ")";
+    BeamSqlRelUtils.toPCollection(writePipeline, 
sqlEnv.parseQuery(insertStatement));
+    writePipeline.run().waitUntilFinish();
+
+    // 4) run external query on Iceberg table (hosted on BQ) to verify correct 
row was written
+    String query = format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), 
tableIdentifier);
+    TableRow returnedRow =
+        BQ_CLIENT.queryUnflattened(query, OPTIONS.getProject(), true, 
true).get(0);
+    Row beamRow = BigQueryUtils.toBeamRow(SOURCE_SCHEMA, returnedRow);
+    Row expectedRow =
+        Row.withSchema(SOURCE_SCHEMA)
+            .addValues(
+                9223372036854775807L,
+                2147483647,
+                (float) 1.0,
+                1.0,
+                true,
+                parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"),
+                "varchar",
+                "char",
+                asList("123", "456"),
+                asList(
+                    nestedRow(asList("abc", "xyz"), 123),
+                    nestedRow(asList("foo", "bar"), 456),
+                    nestedRow(asList("cat", "dog"), 789)))
+            .build();
+    assertEquals(expectedRow, beamRow);
+
+    // 5) read using Beam SQL and verify
+    String selectTableStatement = "SELECT * FROM TEST";
+    PCollection<Row> output =
+        BeamSqlRelUtils.toPCollection(readPipeline, 
sqlEnv.parseQuery(selectTableStatement));
+    PAssert.that(output).containsInAnyOrder(expectedRow);
+    PipelineResult.State state = readPipeline.run().waitUntilFinish();
+    assertThat(state, equalTo(PipelineResult.State.DONE));
+  }
+
+  @Test
+  public void testSQLReadWithProjectAndFilterPushDown() {
+    BeamSqlEnv sqlEnv =
+        BeamSqlEnv.builder(new InMemoryCatalogManager())
+            .setPipelineOptions(PipelineOptionsFactory.create())
+            .build();
+    String tableIdentifier = DATASET + "." + testName.getMethodName();
+
+    // 1) create Iceberg catalog
+    String createCatalog =
+        "CREATE CATALOG my_catalog \n"
+            + "TYPE iceberg \n"
+            + "PROPERTIES (\n"
+            + "  'catalog-impl' = 
'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', \n"
+            + "  'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', \n"
+            + format("  'warehouse' = '%s', \n", warehouse)
+            + format("  'gcp_project' = '%s', \n", OPTIONS.getProject())
+            + "  'gcp_region' = 'us-central1')";
+    sqlEnv.executeDdl(createCatalog);
+
+    // 2) use the catalog we just created
+    String setCatalog = "SET CATALOG my_catalog";
+    sqlEnv.executeDdl(setCatalog);
+
+    // 3) create Beam table
+    String createTableStatement =
+        "CREATE EXTERNAL TABLE TEST( \n"
+            + "   c_integer INTEGER, \n"
+            + "   c_float FLOAT, \n"
+            + "   c_boolean BOOLEAN, \n"
+            + "   c_timestamp TIMESTAMP, \n"
+            + "   c_varchar VARCHAR \n "
+            + ") \n"
+            + "TYPE 'iceberg' \n"
+            + "LOCATION '"
+            + tableIdentifier
+            + "'";
+    sqlEnv.executeDdl(createTableStatement);
+
+    // 4) insert some data)
+    String insertStatement =
+        "INSERT INTO TEST VALUES "
+            + "(123, 1.23, TRUE, TIMESTAMP '2025-05-22 20:17:40.123', 'a'), "
+            + "(456, 4.56, FALSE, TIMESTAMP '2025-05-25 20:17:40.123', 'b'), "
+            + "(789, 7.89, TRUE, TIMESTAMP '2025-05-28 20:17:40.123', 'c')";
+    BeamSqlRelUtils.toPCollection(writePipeline, 
sqlEnv.parseQuery(insertStatement));
+    writePipeline.run().waitUntilFinish(Duration.standardMinutes(5));
+
+    // 5) read with a filter
+    String selectTableStatement =
+        "SELECT c_integer, c_varchar FROM TEST where "
+            + "(c_boolean=TRUE and c_varchar in ('a', 'b')) or c_float > 5";
+    BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, 
relNode);
+
+    assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class));
+    // Unused fields should not be projected by an IO
+    assertThat(relNode.getRowType().getFieldNames(), 
containsInAnyOrder("c_integer", "c_varchar"));
+
+    assertThat(
+        output.getSchema(),
+        equalTo(
+            Schema.builder()
+                .addNullableField("c_integer", INT32)
+                .addNullableField("c_varchar", STRING)
+                .build()));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            Row.withSchema(output.getSchema()).addValues(123, "a").build(),
+            Row.withSchema(output.getSchema()).addValues(789, "c").build());
+    PipelineResult.State state = 
readPipeline.run().waitUntilFinish(Duration.standardMinutes(5));
+    assertThat(state, equalTo(PipelineResult.State.DONE));
+  }
+
+  private Row nestedRow(List<String> arr, Integer intVal) {
+    return Row.withSchema(NESTED_SCHEMA).addValues(arr, intVal).build();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java
new file mode 100644
index 00000000000..316028d7599
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
+
+import static 
org.apache.beam.sdk.extensions.sql.meta.provider.iceberg.IcebergTable.TRIGGERING_FREQUENCY_FIELD;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.stream.Stream;
+import org.apache.beam.sdk.extensions.sql.TableUtils;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import 
org.apache.beam.vendor.calcite.v1_28_0.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+/** UnitTest for {@link IcebergTableProvider}. */
+public class IcebergTableProviderTest {
+  private final IcebergTableProvider provider =
+      new IcebergTableProvider(
+          "test_catalog",
+          ImmutableMap.of(
+              "catalog-impl", 
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
+              "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
+              "warehouse", "gs://bucket/warehouse",
+              "beam.catalog.test_catalog.hadoop.fs.gs.project.id", 
"apache-beam-testing",
+              "beam.catalog.test_catalog.hadoop.foo", "bar"));
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("iceberg", provider.getTableType());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() throws Exception {
+    ImmutableMap<String, Object> properties = 
ImmutableMap.of(TRIGGERING_FREQUENCY_FIELD, 30);
+
+    ObjectMapper mapper = new ObjectMapper();
+    String propertiesString = mapper.writeValueAsString(properties);
+    Table table = fakeTableWithProperties("my_table", propertiesString);
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+    assertNotNull(sqlTable);
+    assertTrue(sqlTable instanceof IcebergTable);
+
+    IcebergTable icebergTable = (IcebergTable) sqlTable;
+    assertEquals("namespace.table", icebergTable.tableIdentifier);
+    assertEquals(provider.catalogConfig, icebergTable.catalogConfig);
+  }
+
+  private static Table fakeTableWithProperties(String name, String properties) 
{
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .location("namespace.table")
+        .schema(
+            Stream.of(
+                    Schema.Field.nullable("id", Schema.FieldType.INT32),
+                    Schema.Field.nullable("name", Schema.FieldType.STRING))
+                .collect(toSchema()))
+        .type("iceberg")
+        .properties(TableUtils.parseProperties(properties))
+        .build();
+  }
+}
diff --git a/sdks/java/io/expansion-service/build.gradle 
b/sdks/java/io/expansion-service/build.gradle
index 6655df7d80c..2e6274571a8 100644
--- a/sdks/java/io/expansion-service/build.gradle
+++ b/sdks/java/io/expansion-service/build.gradle
@@ -71,13 +71,8 @@ dependencies {
   implementation project(":sdks:java:io:kafka:upgrade")
   permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761
 
-  // **** IcebergIO runtime dependencies ****
-  runtimeOnly library.java.hadoop_auth
-  runtimeOnly library.java.hadoop_client
-  // For writing to GCS
-  runtimeOnly library.java.bigdataoss_gcs_connector
+  // **** IcebergIO catalogs ****
   // HiveCatalog
-  runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2")
   runtimeOnly project(path: ":sdks:java:io:iceberg:hive")
   // BigQueryMetastoreCatalog (Java 11+)
   runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: 
"shadow")
diff --git a/sdks/java/io/iceberg/build.gradle 
b/sdks/java/io/iceberg/build.gradle
index 4d79fb061d0..2ac04eb067a 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -63,11 +63,11 @@ dependencies {
     permitUnusedDeclared "org.immutables:value:2.8.8"
     implementation library.java.vendored_calcite_1_28_0
     runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
+    runtimeOnly library.java.bigdataoss_gcs_connector
+    runtimeOnly library.java.hadoop_client
 
     testImplementation project(":sdks:java:managed")
-    testImplementation library.java.hadoop_client
     testImplementation library.java.bigdataoss_gcsio
-    testImplementation library.java.bigdataoss_gcs_connector
     testImplementation library.java.bigdataoss_util_hadoop
     testImplementation "org.apache.parquet:parquet-avro:$parquet_version"
     testImplementation "org.apache.parquet:parquet-common:$parquet_version"
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
index 1ff7119196e..614c45fcf62 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
@@ -31,6 +31,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Internal;
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlBasicCall;
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier;
 import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind;
@@ -47,6 +48,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expression.Operation;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.iceberg.util.NaNUtil;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -57,8 +59,9 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  *
  * <p>Note: Only supports top-level fields (i.e. cannot reference nested 
fields).
  */
-class FilterUtils {
-  private static final Map<SqlKind, Operation> FILTERS =
+@Internal
+public class FilterUtils {
+  static final Map<SqlKind, Operation> FILTERS =
       ImmutableMap.<SqlKind, Operation>builder()
           .put(SqlKind.IS_NULL, Operation.IS_NULL)
           .put(SqlKind.IS_NOT_NULL, Operation.NOT_NULL)
@@ -74,6 +77,8 @@ class FilterUtils {
           .put(SqlKind.OR, Operation.OR)
           .build();
 
+  public static final Set<SqlKind> SUPPORTED_OPS = FILTERS.keySet();
+
   /**
    * Parses a SQL filter expression string and returns a set of all field 
names referenced within
    * it.
@@ -115,7 +120,6 @@ class FilterUtils {
     }
     // SqlLiteral nodes do not contain field names, so we can ignore them.
   }
-
   /**
    * parses a SQL filter expression string into an Iceberg {@link Expression} 
that can be used for
    * data pruning.
@@ -261,8 +265,10 @@ class FilterUtils {
     checkArgument(
         value instanceof SqlNodeList,
         "Expected right hand side to be a list but got " + value.getClass());
-    String name = ((SqlIdentifier) term).getSimple();
-    TypeID type = schema.findType(name).typeId();
+    String caseInsensitiveName = ((SqlIdentifier) term).getSimple();
+    Types.NestedField field = 
schema.caseInsensitiveFindField(caseInsensitiveName);
+    String name = field.name();
+    TypeID type = field.type().typeId();
     List<SqlNode> list =
         ((SqlNodeList) value)
             
.getList().stream().filter(Objects::nonNull).collect(Collectors.toList());
@@ -287,13 +293,17 @@ class FilterUtils {
     SqlNode left = getLeftChild(call);
     SqlNode right = getRightChild(call);
     if (left instanceof SqlIdentifier && right instanceof SqlLiteral) {
-      String name = ((SqlIdentifier) left).getSimple();
-      TypeID type = schema.findType(name).typeId();
+      String caseInsensitiveName = ((SqlIdentifier) left).getSimple();
+      Types.NestedField field = 
schema.caseInsensitiveFindField(caseInsensitiveName);
+      String name = field.name();
+      TypeID type = field.type().typeId();
       Object value = convertLiteral((SqlLiteral) right, name, type);
       return convertLR.apply(name, value);
     } else if (left instanceof SqlLiteral && right instanceof SqlIdentifier) {
-      String name = ((SqlIdentifier) right).getSimple();
-      TypeID type = schema.findType(name).typeId();
+      String caseInsensitiveName = ((SqlIdentifier) right).getSimple();
+      Types.NestedField field = 
schema.caseInsensitiveFindField(caseInsensitiveName);
+      String name = field.name();
+      TypeID type = field.type().typeId();
       Object value = convertLiteral((SqlLiteral) left, name, type);
       return convertRL.apply(name, value);
     } else {
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
index c3a185f3a83..3358bc072a2 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
@@ -50,6 +50,8 @@ public abstract class IcebergCatalogConfig implements 
Serializable {
     return new AutoValue_IcebergCatalogConfig.Builder();
   }
 
+  public abstract Builder toBuilder();
+
   public org.apache.iceberg.catalog.Catalog catalog() {
     if (cachedCatalog == null) {
       String catalogName = getCatalogName();
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
index de88b4af269..81ec229df70 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
@@ -93,7 +93,6 @@ class ScanTaskReader extends BoundedSource.BoundedReader<Row> 
{
 
     // This nullness annotation is incorrect, but the most expedient way to 
work with Iceberg's APIs
     // which are not null-safe.
-    @SuppressWarnings("nullness")
     org.apache.iceberg.Schema requiredSchema = 
source.getScanConfig().getRequiredSchema();
     @Nullable
     String nameMapping = 
source.getTable().properties().get(TableProperties.DEFAULT_NAME_MAPPING);

Reply via email to