kamilwu commented on a change in pull request #11856:
URL: https://github.com/apache/beam/pull/11856#discussion_r498143457
##########
File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py
##########
@@ -122,85 +122,95 @@ def process(self, unused_element, side_input):
# No-op. We only make sure that the element is accessed.
next(it)
except StopIteration:
- return
+ break
class MappingSideInputTestDoFn(beam.DoFn):
- """Take a sequence of keys as an additional side input and for each
- key in the sequence checks the value for key in the dictionary."""
- def process(self, unused_element, dict_side_input, keys_to_check):
- for key in keys_to_check:
- # No-op. We only make sure that the element is accessed.
- dict_side_input[key]
-
- class GetRandomKeys(beam.DoFn):
- def __init__(self, n):
- self._n = n
+ """Iterates over first n keys in the dictionary and checks the value."""
+ def __init__(self, first_n):
+ self._first_n = first_n
def process(self, unused_element, dict_side_input):
- import random
- n = min(self._n, len(dict_side_input))
- return random.sample(dict_side_input.keys(), n)
+ i = 0
+ for key in dict_side_input:
+ if i == self._first_n:
+ break
+ # No-op. We only make sure that the element is accessed.
+ dict_side_input[key]
+ i += 1
- class AddEventTimestamps(beam.DoFn):
- """Assign timestamp to each element of PCollection."""
- def setup(self):
- self._timestamp = 0
+ @typehints.with_input_types(int)
+ @typehints.with_output_types(int)
+ class AssignTimestamps(beam.DoFn):
+ """Produces timestamped values. Timestamps are equal to the value of the
+ element."""
+ def __init__(self):
+ # Avoid having to use save_main_session
+ self.window = window
Review comment:
Dataflow workers cannot use functions, classes and module imports
defined in `__main__` (the global scope) unless the `--save_main_session`
pipeline option is specified. However, `--save_main_session` is likely to cause
problems (for example: https://issues.apache.org/jira/browse/BEAM-6158) and I
wanted to avoid using it. So this is a workaround. Thanks to it, I'm able to
use `window` module without enabling `--save_main_session`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]