tysonjh commented on a change in pull request #11856:
URL: https://github.com/apache/beam/pull/11856#discussion_r497786859



##########
File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py
##########
@@ -122,85 +122,95 @@ def process(self, unused_element, side_input):
             # No-op. We only make sure that the element is accessed.
             next(it)
           except StopIteration:
-            return
+            break
 
     class MappingSideInputTestDoFn(beam.DoFn):

Review comment:
       Add typehints here?

##########
File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py
##########
@@ -122,85 +122,95 @@ def process(self, unused_element, side_input):
             # No-op. We only make sure that the element is accessed.
             next(it)
           except StopIteration:
-            return
+            break
 
     class MappingSideInputTestDoFn(beam.DoFn):
-      """Take a sequence of keys as an additional side input and for each
-      key in the sequence checks the value for key in the dictionary."""
-      def process(self, unused_element, dict_side_input, keys_to_check):
-        for key in keys_to_check:
-          # No-op. We only make sure that the element is accessed.
-          dict_side_input[key]
-
-    class GetRandomKeys(beam.DoFn):
-      def __init__(self, n):
-        self._n = n
+      """Iterates over first n keys in the dictionary and checks the value."""
+      def __init__(self, first_n):
+        self._first_n = first_n
 
       def process(self, unused_element, dict_side_input):
-        import random
-        n = min(self._n, len(dict_side_input))
-        return random.sample(dict_side_input.keys(), n)
+        i = 0
+        for key in dict_side_input:
+          if i == self._first_n:
+            break
+          # No-op. We only make sure that the element is accessed.
+          dict_side_input[key]
+          i += 1
 
-    class AddEventTimestamps(beam.DoFn):
-      """Assign timestamp to each element of PCollection."""
-      def setup(self):
-        self._timestamp = 0
+    @typehints.with_input_types(int)
+    @typehints.with_output_types(int)
+    class AssignTimestamps(beam.DoFn):
+      """Produces timestamped values. Timestamps are equal to the value of the
+      element."""
+      def __init__(self):
+        # Avoid having to use save_main_session
+        self.window = window

Review comment:
       What does this do? 

##########
File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py
##########
@@ -122,85 +122,95 @@ def process(self, unused_element, side_input):
             # No-op. We only make sure that the element is accessed.
             next(it)
           except StopIteration:
-            return
+            break
 
     class MappingSideInputTestDoFn(beam.DoFn):
-      """Take a sequence of keys as an additional side input and for each
-      key in the sequence checks the value for key in the dictionary."""
-      def process(self, unused_element, dict_side_input, keys_to_check):
-        for key in keys_to_check:
-          # No-op. We only make sure that the element is accessed.
-          dict_side_input[key]
-
-    class GetRandomKeys(beam.DoFn):
-      def __init__(self, n):
-        self._n = n
+      """Iterates over first n keys in the dictionary and checks the value."""
+      def __init__(self, first_n):
+        self._first_n = first_n
 
       def process(self, unused_element, dict_side_input):
-        import random
-        n = min(self._n, len(dict_side_input))
-        return random.sample(dict_side_input.keys(), n)
+        i = 0
+        for key in dict_side_input:
+          if i == self._first_n:
+            break
+          # No-op. We only make sure that the element is accessed.
+          dict_side_input[key]
+          i += 1
 
-    class AddEventTimestamps(beam.DoFn):
-      """Assign timestamp to each element of PCollection."""
-      def setup(self):
-        self._timestamp = 0
+    @typehints.with_input_types(int)
+    @typehints.with_output_types(int)
+    class AssignTimestamps(beam.DoFn):
+      """Produces timestamped values. Timestamps are equal to the value of the
+      element."""
+      def __init__(self):
+        # Avoid having to use save_main_session
+        self.window = window
 
       def process(self, element):
-        from apache_beam.transforms.combiners import window
-        yield window.TimestampedValue(element, self._timestamp)
-        self._timestamp += 1
-
-    input_pc = (
-        self.pipeline
-        | 'Read synthetic' >> beam.io.Read(
-            SyntheticSource(self.parse_synthetic_source_options()))
-        | 'Collect start time metrics' >> beam.ParDo(
-            MeasureTime(self.metrics_namespace)))
-
-    if self.side_input_size != self.input_options.get('num_records'):
-      side_input = (
-          input_pc
-          | 'Sample {} elements'.format(self.side_input_size) >>
-          beam.combiners.Sample.FixedSizeGlobally(self.side_input_size)
-          | 'Flatten a sequence' >> beam.FlatMap(lambda x: x))
+        yield self.window.TimestampedValue(element, element)
+
+    @typehints.with_input_types(Any)
+    @typehints.with_output_types(Dict[str, Union[int, str]])
+    class GetSyntheticSDFOptions(beam.DoFn):
+      def __init__(self, elements_per_record, key_size, value_size):
+        self.elements_per_record = elements_per_record
+        self.key_size = key_size
+        self.value_size = value_size
+
+      def process(self, unused_element):
+        yield {
+            'num_records': self.elements_per_record,
+            'key_size': self.key_size,
+            'value_size': self.value_size,
+            'initial_splitting_num_bundles': 0,
+            'initial_splitting_desired_bundle_size': 0,
+            'sleep_per_input_record_sec': 0,
+            'initial_splitting': 'const'
+        }
+
+    main_input = self.pipeline | 'Create' >> beam.Create(range(self.windows))
+
+    initial_elements = self.SDF_INITIAL_ELEMENTS
+    if self.windows > 1:
+      main_input = (
+          main_input
+          | 'Assign timestamps' >> beam.ParDo(AssignTimestamps())
+          | 'Apply windows' >> beam.WindowInto(window.FixedWindows(1)))
+      side_input = main_input
+      initial_elements = self.windows
     else:
-      side_input = input_pc
-
-    if self.windows > 0:
-      window_size = self.side_input_size / self.windows
-      logging.info('Fixed windows of %s seconds will be applied', window_size)
-      side_input = (
-          side_input
-          | 'Add event timestamps' >> beam.ParDo(AddEventTimestamps())
-          | 'Apply windows' >> beam.WindowInto(
-              beam.combiners.window.FixedWindows(window_size)))
+      side_input = self.pipeline | 'Side input: create' >> beam.Create(
+          range(initial_elements))

Review comment:
       Could you explain this a bit please? It seems like when there are <=1 
windows it will result in the side_input having 1000 SDF sources but I don't 
really understand why.

##########
File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py
##########
@@ -27,11 +27,7 @@
   * side_input_type (str) - Required. Specifies how the side input will be
     materialized in ParDo operation. Choose from (dict, iter, list).
   * window_count (int) - The number of fixed sized windows to subdivide the
-    side input into. By default, no windows will be used.
-  * side_input_size (int) - The size of the side input. Must be equal to or
-    lower than the size of the main input. If lower, the side input will be
-    created by applying a :class:`beam.combiners.Sample
-    <apache_beam.transforms.combiners.Sample>` transform.
+    side input into. By default, a global window will be used.

Review comment:
       This should mention that by default the side_input will be subdivided 
into 1000 windows.

##########
File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py
##########
@@ -122,85 +122,95 @@ def process(self, unused_element, side_input):
             # No-op. We only make sure that the element is accessed.
             next(it)
           except StopIteration:
-            return
+            break
 
     class MappingSideInputTestDoFn(beam.DoFn):

Review comment:
       Add typehints here too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to