This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 76708d54438 added ddl to SqlTransform (#34614)
76708d54438 is described below
commit 76708d54438c663c1d3112329c4ce896a5796ced
Author: liferoad <[email protected]>
AuthorDate: Sun Apr 13 08:26:49 2025 -0400
added ddl to SqlTransform (#34614)
* added ddl to SqlTransform
* uncommented tests
---
.../expansion/ExternalSqlTransformRegistrar.java | 9 ++++++++
sdks/python/apache_beam/transforms/sql.py | 9 +++++---
sdks/python/apache_beam/transforms/sql_test.py | 24 ++++++++++++++++++++++
3 files changed, 39 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
index 86f2260c974..d7a9bb28896 100644
---
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
+++
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
@@ -50,6 +50,7 @@ public class ExternalSqlTransformRegistrar implements
ExternalTransformRegistrar
public static class Configuration {
String query = "";
@Nullable String dialect;
+ @Nullable String ddl;
public void setQuery(String query) {
this.query = query;
@@ -58,6 +59,10 @@ public class ExternalSqlTransformRegistrar implements
ExternalTransformRegistrar
public void setDialect(@Nullable String dialect) {
this.dialect = dialect;
}
+
+ public void setDdl(@Nullable String ddl) {
+ this.ddl = ddl;
+ }
}
private static class Builder
@@ -76,6 +81,10 @@ public class ExternalSqlTransformRegistrar implements
ExternalTransformRegistrar
}
transform = transform.withQueryPlannerClass(queryPlanner);
}
+ // Add any DDL string
+ if (configuration.ddl != null) {
+ transform = transform.withDdlString(configuration.ddl);
+ }
return transform;
}
}
diff --git a/sdks/python/apache_beam/transforms/sql.py
b/sdks/python/apache_beam/transforms/sql.py
index 8b07d4b7f6a..21cae3f6c75 100644
--- a/sdks/python/apache_beam/transforms/sql.py
+++ b/sdks/python/apache_beam/transforms/sql.py
@@ -28,7 +28,9 @@ from apache_beam.transforms.external import
NamedTupleBasedPayloadBuilder
__all__ = ['SqlTransform']
SqlTransformSchema = typing.NamedTuple(
- 'SqlTransformSchema', [('query', str), ('dialect', typing.Optional[str])])
+ 'SqlTransformSchema',
+ [('query', str), ('dialect', typing.Optional[str]),
+ ('ddl', typing.Optional[str])])
class SqlTransform(ExternalTransform):
@@ -75,12 +77,13 @@ class SqlTransform(ExternalTransform):
"""
URN = 'beam:external:java:sql:v1'
- def __init__(self, query, dialect=None, expansion_service=None):
+ def __init__(self, query, dialect=None, ddl=None, expansion_service=None):
"""
Creates a SqlTransform which will be expanded to Java's SqlTransform.
(See class docs).
:param query: The SQL query.
:param dialect: (optional) The dialect, e.g. use 'zetasql' for ZetaSQL.
+ :param ddl: (optional) The DDL statement.
:param expansion_service: (optional) The URL of the expansion service to
use
"""
expansion_service = expansion_service or BeamJarExpansionService(
@@ -88,5 +91,5 @@ class SqlTransform(ExternalTransform):
super().__init__(
self.URN,
NamedTupleBasedPayloadBuilder(
- SqlTransformSchema(query=query, dialect=dialect)),
+ SqlTransformSchema(query=query, dialect=dialect, ddl=ddl)),
expansion_service=expansion_service)
diff --git a/sdks/python/apache_beam/transforms/sql_test.py
b/sdks/python/apache_beam/transforms/sql_test.py
index a7da253c461..a87cf266d4b 100644
--- a/sdks/python/apache_beam/transforms/sql_test.py
+++ b/sdks/python/apache_beam/transforms/sql_test.py
@@ -209,6 +209,30 @@ class SqlTransformTest(unittest.TestCase):
| SqlTransform("SELECT * FROM PCOLLECTION WHERE shopper = 'alice'"))
assert_that(out, equal_to([('alice', {'apples': 2, 'bananas': 3})]))
+ def test_sql_ddl_set_option(self):
+ with TestPipeline() as p:
+ input_data = [
+ beam.Row(id=1, value=10),
+ beam.Row(id=2, value=20),
+ beam.Row(id=3, value=30)
+ ]
+ # DDL uses SET to modify a session option (tests DDL parsing)
+ # Using a known Calcite option like sqlConformance
+ ddl_statement = """
+ SET sqlConformance = 'LENIENT'
+ """
+ # Query still operates on the implicit PCOLLECTION
+ query_statement = "SELECT * FROM PCOLLECTION WHERE id > 2"
+
+ # Input PCollection is piped directly
+ out = (
+ p | beam.Create(input_data)
+ # Pass both the query and the DDL
+ | SqlTransform(query=query_statement, ddl=ddl_statement))
+
+ # Verify the output matches the query (unaffected by the SET DDL)
+ assert_that(out, equal_to([(3, 30)]))
+
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)