Vancior commented on code in PR #20396:
URL: https://github.com/apache/flink/pull/20396#discussion_r935187966
##########
flink-python/pyflink/fn_execution/embedded/operations.py:
##########
@@ -37,8 +37,28 @@ def close(self):
operation.close()
def on_timer(self, timestamp):
- for operation in self._operations:
- for item in operation.on_timer(timestamp):
+ results = self._main_operation.on_timer(timestamp)
+
+ if results:
+ results = self._process_elements(results)
+
+ if results:
+ yield from self._output_elements(results)
+
+ def _process_elements(self, elements):
+ def _process_elements_on_operation(op, items):
+ if items:
+ for item in items:
+ yield from op.process_element(item)
Review Comment:
Still need to check if the process_element returns None?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]