This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ac2706a291b Fix timestamp issue in a yaml pipeline that calls SQL
transform (#35789)
ac2706a291b is described below
commit ac2706a291bb0217ea707050a6641b5630558c68
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Aug 5 20:12:58 2025 -0400
Fix timestamp issue in a yaml pipeline that calls SQL transform (#35789)
---
sdks/python/apache_beam/typehints/schemas.py | 12 +++++++++++-
sdks/python/apache_beam/yaml/tests/sql.yaml | 18 ++++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/typehints/schemas.py
b/sdks/python/apache_beam/typehints/schemas.py
index a3d9e4d8bf7..32dc2fd06ec 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -151,11 +151,21 @@ def named_fields_to_schema(
if isinstance(names_and_types, dict):
names_and_types = names_and_types.items()
+ _, cached_schema = schema_registry.by_id.get(schema_id, (None, None))
+ if cached_schema:
+ type_by_name_from_schema = {
+ field.name: field.type
+ for field in cached_schema.fields
+ }
+ else:
+ type_by_name_from_schema = {}
+
schema = schema_pb2.Schema(
fields=[
schema_pb2.Field(
name=name,
- type=typing_to_runner_api(type),
+ type=type_by_name_from_schema.get(
+ name, typing_to_runner_api(type)),
options=[
option_to_runner_api(option_tuple)
for option_tuple in field_options.get(name, [])
diff --git a/sdks/python/apache_beam/yaml/tests/sql.yaml
b/sdks/python/apache_beam/yaml/tests/sql.yaml
index afa9e834fe9..0040a2790c5 100644
--- a/sdks/python/apache_beam/yaml/tests/sql.yaml
+++ b/sdks/python/apache_beam/yaml/tests/sql.yaml
@@ -75,3 +75,21 @@ pipelines:
- {a: "x", s: "2"}
- {a: "x", s: "3"}
- {a: "y", s: "10"}
+
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ name: CreateSampleData
+ config:
+ elements:
+ - { id: 1, name: "John" }
+ - { id: 2, name: "Jane" }
+ - type: Sql
+ name: sql
+ config:
+ query: >
+ SELECT *, CURRENT_TIMESTAMP AS ingest_timestamp FROM PCOLLECTION
+ - type: PyTransform
+ config:
+ constructor: apache_beam.transforms.util.LogElements