tvalentyn commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1237066077


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):
+  def test_with_periodic_impulse(self):

Review Comment:
   can we describe tested behavior and expected outcome in the test scenario 
name? For example: 
   
   
`test_combining_unbounded_pcoll_with_custom_windowing_raises_error_when_default_unspecified`
  
   
`test_unbounded_pcoll_with_custom_windowing_require_specifying_defaults_for_empty_windows`
   
   (feel free to adjust the name)



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2587,6 +2587,18 @@ def add_input_types(transform):
             "or CombineGlobally().as_singleton_view() to get the default "
             "output of the CombineFn if the input PCollection is empty.")
 
+      # return the error for this ill-defined streaming case
+      if not pcoll.is_bounded and not pcoll.windowing.is_default():
+        raise ValueError(
+            "For unbounded data sources, "
+            "default values are not yet supported in CombineGlobally() if the "
+            "output PCollection is not windowed by GlobalWindows"
+            " with DefaultTrigger. "

Review Comment:
   nit: let's move the leading space to end of last line for consistency 



##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   should we place this in combiners_test.py ?



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2587,6 +2587,18 @@ def add_input_types(transform):
             "or CombineGlobally().as_singleton_view() to get the default "
             "output of the CombineFn if the input PCollection is empty.")
 
+      # return the error for this ill-defined streaming case
+      if not pcoll.is_bounded and not pcoll.windowing.is_default():
+        raise ValueError(

Review Comment:
   1. Is the this behavior applicable only for CombineGlobally or also for 
CombinePerKey / CombineValues ? Perhaps we should generalize the wording here 
if other tests also fail in this scenario.
   
   2. For my understanding, how is the default value defined if we do use the 
default trigger?
   
   3. Should this wording be more appropriate: 
   
   ```
   "When combining elements in unbounded collections with non-default windowing 
strategy, you must explicitly specify how to define the combined result of an 
empty window. "
   
   "Please use CombineGlobally().without_defaults() to output "
               "an empty PCollection if the input PCollection is empty, "
               "or CombineGlobally().as_singleton_view() to get the default "
               "output of the CombineFn if the input PCollection is empty.")
   ```            
   4. If users specify `as_singleton_view`, do we need to check that  
underlying CombineFn `.has_defaults()`? Is a meaningful error thrown  if it 
doesn't? 



##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):
+  def test_with_periodic_impulse(self):
+    # this error is expected since the below combination is ill-defined.
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        _ = (
+            p
+            | PeriodicImpulse(
+                start_timestamp=time.time(),
+                stop_timestamp=time.time() + 4,
+                fire_interval=1,
+                apply_windowing=False,
+            )
+            | beam.Map(lambda x: ('c', 1))
+            | beam.WindowInto(
+                window.GlobalWindows(),
+                trigger=trigger.Repeatedly(trigger.AfterCount(2)),
+                accumulation_mode=trigger.AccumulationMode.DISCARDING,
+            )
+            | beam.combiners.Count.Globally()

Review Comment:
   does this test pass if we use 
`beam.combiners.Count.Globally().as_singleton_view()`? Should we also add such 
scenario?



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2587,6 +2587,18 @@ def add_input_types(transform):
             "or CombineGlobally().as_singleton_view() to get the default "
             "output of the CombineFn if the input PCollection is empty.")
 
+      # return the error for this ill-defined streaming case
+      if not pcoll.is_bounded and not pcoll.windowing.is_default():

Review Comment:
   Could you please add a docstring for `Windowing.is_default()` while at it? I 
wonder if you actually wanted to refer to `CombineGlobally.has_defaults` here...



##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):
+  def test_with_periodic_impulse(self):
+    # this error is expected since the below combination is ill-defined.
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        _ = (
+            p
+            | PeriodicImpulse(
+                start_timestamp=time.time(),
+                stop_timestamp=time.time() + 4,
+                fire_interval=1,
+                apply_windowing=False,
+            )
+            | beam.Map(lambda x: ('c', 1))
+            | beam.WindowInto(
+                window.GlobalWindows(),
+                trigger=trigger.Repeatedly(trigger.AfterCount(2)),
+                accumulation_mode=trigger.AccumulationMode.DISCARDING,
+            )
+            | beam.combiners.Count.Globally()
+            | "Print Windows" >> beam.Map(print))

Review Comment:
   nit: unnecessary line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to