This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dbc5c4e15b1 [Python] Fail WindowInto when there are no inputs to it 
(#28946)
dbc5c4e15b1 is described below

commit dbc5c4e15b1378125e610286d005e56ee605b96b
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Wed Oct 18 11:48:47 2023 -0400

    [Python] Fail WindowInto when there are no inputs to it (#28946)
---
 sdks/python/apache_beam/runners/common.py | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index ed0dc2d9a0c..1cd0a304466 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -1929,6 +1929,12 @@ def validate_pipeline_graph(pipeline_proto):
         raise ValueError(
             "Incompatible input coder %s and output coder %s for transform %s" 
%
             (transform_id, input_coder, output_coder))
+    elif transform_proto.spec.urn == common_urns.primitives.ASSIGN_WINDOWS.urn:
+      if not transform_proto.inputs:
+        raise ValueError("Missing input for transform: %s" % transform_proto)
+    elif transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn:
+      if not transform_proto.inputs:
+        raise ValueError("Missing input for transform: %s" % transform_proto)
 
     for t in transform_proto.subtransforms:
       validate_transform(t)

Reply via email to