This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dbc5c4e15b1 [Python] Fail WindowInto when there are no inputs to it
(#28946)
dbc5c4e15b1 is described below
commit dbc5c4e15b1378125e610286d005e56ee605b96b
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Wed Oct 18 11:48:47 2023 -0400
[Python] Fail WindowInto when there are no inputs to it (#28946)
---
sdks/python/apache_beam/runners/common.py | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/sdks/python/apache_beam/runners/common.py
b/sdks/python/apache_beam/runners/common.py
index ed0dc2d9a0c..1cd0a304466 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -1929,6 +1929,12 @@ def validate_pipeline_graph(pipeline_proto):
raise ValueError(
"Incompatible input coder %s and output coder %s for transform %s"
%
(transform_id, input_coder, output_coder))
+ elif transform_proto.spec.urn == common_urns.primitives.ASSIGN_WINDOWS.urn:
+ if not transform_proto.inputs:
+ raise ValueError("Missing input for transform: %s" % transform_proto)
+ elif transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn:
+ if not transform_proto.inputs:
+ raise ValueError("Missing input for transform: %s" % transform_proto)
for t in transform_proto.subtransforms:
validate_transform(t)