robertwb commented on a change in pull request #14095:
URL: https://github.com/apache/beam/pull/14095#discussion_r583978273



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -443,23 +661,22 @@ def _make_safe_windowing_strategy(self, id):
     windowing_strategy_proto = 
self.pipeline_components.windowing_strategies[id]
     if windowing_strategy_proto.window_fn.urn in SAFE_WINDOW_FNS:
       return id
-    elif (windowing_strategy_proto.merge_status ==
-          beam_runner_api_pb2.MergeStatus.NON_MERGING) or True:
+    else:
       safe_id = id + '_safe'
       while safe_id in self.pipeline_components.windowing_strategies:
         safe_id += '_'
       safe_proto = copy.copy(windowing_strategy_proto)
-      safe_proto.window_fn.urn = GenericNonMergingWindowFn.URN
-      safe_proto.window_fn.payload = (
-          windowing_strategy_proto.window_coder_id.encode('utf-8'))
+      if (windowing_strategy_proto.merge_status ==
+          beam_runner_api_pb2.MergeStatus.NON_MERGING):
+        safe_proto.window_fn.urn = GenericNonMergingWindowFn.URN
+        safe_proto.window_fn.payload = (
+            windowing_strategy_proto.window_coder_id.encode('utf-8'))
+      else:

Review comment:
       GenericMergingWindowFn handles arbitrary merging window fns over the 
FnAPI (I've been testing this with Java on the ULR.) Good point about the else 
though, explicitly testing for NEEDS_MERGE and throwing an exception otherwise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to