kamilwu commented on a change in pull request #11136: [BEAM-7505] Add side input load test to Python SDK URL: https://github.com/apache/beam/pull/11136#discussion_r395594161
########## File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py ########## @@ -47,65 +49,154 @@ or: ./gradlew -PloadTest.args=" - --publish_to_big_query=true - --project=... - --metrics_dataset=python_load_tests - --metrics_table=side_input + --side_input_type=iter --input_options='{ - \"num_records\": 1, - \"key_size\": 1, - \"value_size\": 1}' - --runner=DirectRunner" \ + \"num_records\": 300, + \"key_size\": 5, + \"value_size\": 15}'" \ -PloadTest.mainClass=apache_beam.testing.load_tests.sideinput_test \ -Prunner=DirectRunner :sdks:python:apache_beam:testing:load_tests:run """ # pytype: skip-file from __future__ import absolute_import +from __future__ import division import logging import apache_beam as beam -from apache_beam.pvalue import AsIter from apache_beam.testing.load_tests.load_test import LoadTest from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime from apache_beam.testing.synthetic_pipeline import SyntheticSource class SideInputTest(LoadTest): + SIDE_INPUT_TYPES = { + 'iter': beam.pvalue.AsIter, + 'list': beam.pvalue.AsList, + 'dict': beam.pvalue.AsDict, + } + def __init__(self): super(SideInputTest, self).__init__() - self.iterations = self.get_option_or_default( - 'number_of_counter_operations', 1) + self.windows = self.get_option_or_default('window_count', default=0) + self.access_percentage = self.get_option_or_default( + 'access_percentage', default=100) + if self.access_percentage < 0 or self.access_percentage > 100: + raise ValueError( + 'access_percentage: Invalid value. Should be in range ' + 'from 0 to 100, got {} instead'.format(self.access_percentage)) + + self.side_input_size = self.get_option_or_default( + 'side_input_size', default=0) + if self.side_input_size == 0: + self.side_input_size = self.input_options.get('num_records') + + self.side_input_type = self.pipeline.get_option('side_input_type') + if self.side_input_type is None: + raise ValueError('side_input_type is required') + + def materialize_as(self): + try: + return self.SIDE_INPUT_TYPES[self.side_input_type] + except KeyError: + raise ValueError( + 'Unknown side input type. You have to provide one of ' + 'these: {}'.format(list(self.SIDE_INPUT_TYPES.keys()))) def test(self): - def join_fn(element, side_input, iterations): - result = [] - for i in range(iterations): - for key, value in side_input: - if i == iterations - 1: - result.append({key: element[1] + value}) - yield result - - main_input = ( + class SequenceSideInputTestDoFn(beam.DoFn): + """Iterate over first n side_input elements. Iterate over all + elements if `first_n` is :data:`None`.""" + def __init__(self, first_n=None): + self._first_n = first_n + + def process(self, unused_element, side_input): + for i, _ in enumerate(side_input): + if self._first_n and i >= self._first_n: + return + # No-op. We only make sure that the element is accessed. Review comment: No, as soon as `i == self._first_n`, the `if` statement is hit and the function returns immediately. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services