rohdesamuel commented on a change in pull request #11005: [BEAM-8335] Modify
the StreamingCache to subclass the CacheManager
URL: https://github.com/apache/beam/pull/11005#discussion_r387888483
##########
File path: sdks/python/apache_beam/runners/direct/transform_evaluator.py
##########
@@ -471,7 +537,44 @@ def process_element(self, element):
# We can either have the _TestStream or the _WatermarkController to emit
# the elements. We chose to emit in the _WatermarkController so that the
# element is emitted at the correct watermark value.
- for event in self.test_stream.events(self.current_index):
+ events = []
+ if self.watermark == MIN_TIMESTAMP:
+ for event in self.test_stream._set_up(self.test_stream.output_tags):
+ events.append(event)
+
+ if self.test_stream_event_channel:
+ try:
+ event = next(self.test_stream_event_channel)
Review comment:
Ack, unified it with the two methods in the test_stream_impl to return
iterators. These will be stored in the global EvaluationContext state.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services