This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 76708d54438 added ddl to SqlTransform (#34614)
76708d54438 is described below

commit 76708d54438c663c1d3112329c4ce896a5796ced
Author: liferoad <[email protected]>
AuthorDate: Sun Apr 13 08:26:49 2025 -0400

    added ddl to SqlTransform (#34614)
    
    * added ddl to SqlTransform
    
    * uncommented tests
---
 .../expansion/ExternalSqlTransformRegistrar.java   |  9 ++++++++
 sdks/python/apache_beam/transforms/sql.py          |  9 +++++---
 sdks/python/apache_beam/transforms/sql_test.py     | 24 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
 
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
index 86f2260c974..d7a9bb28896 100644
--- 
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
+++ 
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
@@ -50,6 +50,7 @@ public class ExternalSqlTransformRegistrar implements 
ExternalTransformRegistrar
   public static class Configuration {
     String query = "";
     @Nullable String dialect;
+    @Nullable String ddl;
 
     public void setQuery(String query) {
       this.query = query;
@@ -58,6 +59,10 @@ public class ExternalSqlTransformRegistrar implements 
ExternalTransformRegistrar
     public void setDialect(@Nullable String dialect) {
       this.dialect = dialect;
     }
+
+    public void setDdl(@Nullable String ddl) {
+      this.ddl = ddl;
+    }
   }
 
   private static class Builder
@@ -76,6 +81,10 @@ public class ExternalSqlTransformRegistrar implements 
ExternalTransformRegistrar
         }
         transform = transform.withQueryPlannerClass(queryPlanner);
       }
+      // Add any DDL string
+      if (configuration.ddl != null) {
+        transform = transform.withDdlString(configuration.ddl);
+      }
       return transform;
     }
   }
diff --git a/sdks/python/apache_beam/transforms/sql.py 
b/sdks/python/apache_beam/transforms/sql.py
index 8b07d4b7f6a..21cae3f6c75 100644
--- a/sdks/python/apache_beam/transforms/sql.py
+++ b/sdks/python/apache_beam/transforms/sql.py
@@ -28,7 +28,9 @@ from apache_beam.transforms.external import 
NamedTupleBasedPayloadBuilder
 __all__ = ['SqlTransform']
 
 SqlTransformSchema = typing.NamedTuple(
-    'SqlTransformSchema', [('query', str), ('dialect', typing.Optional[str])])
+    'SqlTransformSchema',
+    [('query', str), ('dialect', typing.Optional[str]),
+     ('ddl', typing.Optional[str])])
 
 
 class SqlTransform(ExternalTransform):
@@ -75,12 +77,13 @@ class SqlTransform(ExternalTransform):
   """
   URN = 'beam:external:java:sql:v1'
 
-  def __init__(self, query, dialect=None, expansion_service=None):
+  def __init__(self, query, dialect=None, ddl=None, expansion_service=None):
     """
     Creates a SqlTransform which will be expanded to Java's SqlTransform.
     (See class docs).
     :param query: The SQL query.
     :param dialect: (optional) The dialect, e.g. use 'zetasql' for ZetaSQL.
+    :param ddl: (optional) The DDL statement.
     :param expansion_service: (optional) The URL of the expansion service to 
use
     """
     expansion_service = expansion_service or BeamJarExpansionService(
@@ -88,5 +91,5 @@ class SqlTransform(ExternalTransform):
     super().__init__(
         self.URN,
         NamedTupleBasedPayloadBuilder(
-            SqlTransformSchema(query=query, dialect=dialect)),
+            SqlTransformSchema(query=query, dialect=dialect, ddl=ddl)),
         expansion_service=expansion_service)
diff --git a/sdks/python/apache_beam/transforms/sql_test.py 
b/sdks/python/apache_beam/transforms/sql_test.py
index a7da253c461..a87cf266d4b 100644
--- a/sdks/python/apache_beam/transforms/sql_test.py
+++ b/sdks/python/apache_beam/transforms/sql_test.py
@@ -209,6 +209,30 @@ class SqlTransformTest(unittest.TestCase):
           | SqlTransform("SELECT * FROM PCOLLECTION WHERE shopper = 'alice'"))
       assert_that(out, equal_to([('alice', {'apples': 2, 'bananas': 3})]))
 
+  def test_sql_ddl_set_option(self):
+    with TestPipeline() as p:
+      input_data = [
+          beam.Row(id=1, value=10),
+          beam.Row(id=2, value=20),
+          beam.Row(id=3, value=30)
+      ]
+      # DDL uses SET to modify a session option (tests DDL parsing)
+      # Using a known Calcite option like sqlConformance
+      ddl_statement = """
+          SET sqlConformance = 'LENIENT'
+      """
+      # Query still operates on the implicit PCOLLECTION
+      query_statement = "SELECT * FROM PCOLLECTION WHERE id > 2"
+
+      # Input PCollection is piped directly
+      out = (
+          p | beam.Create(input_data)
+          # Pass both the query and the DDL
+          | SqlTransform(query=query_statement, ddl=ddl_statement))
+
+      # Verify the output matches the query (unaffected by the SET DDL)
+      assert_that(out, equal_to([(3, 30)]))
+
 
 if __name__ == "__main__":
   logging.getLogger().setLevel(logging.INFO)

Reply via email to