kamilwu commented on a change in pull request #11136: [BEAM-7505] Add side 
input load test to Python SDK 
URL: https://github.com/apache/beam/pull/11136#discussion_r395594609
 
 

 ##########
 File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py
 ##########
 @@ -47,65 +49,154 @@
 or:
 
 ./gradlew -PloadTest.args="
-    --publish_to_big_query=true
-    --project=...
-    --metrics_dataset=python_load_tests
-    --metrics_table=side_input
+    --side_input_type=iter
     --input_options='{
-      \"num_records\": 1,
-      \"key_size\": 1,
-      \"value_size\": 1}'
-    --runner=DirectRunner" \
+      \"num_records\": 300,
+      \"key_size\": 5,
+      \"value_size\": 15}'" \
 -PloadTest.mainClass=apache_beam.testing.load_tests.sideinput_test \
 -Prunner=DirectRunner :sdks:python:apache_beam:testing:load_tests:run
 """
 
 # pytype: skip-file
 
 from __future__ import absolute_import
+from __future__ import division
 
 import logging
 
 import apache_beam as beam
-from apache_beam.pvalue import AsIter
 from apache_beam.testing.load_tests.load_test import LoadTest
 from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
 from apache_beam.testing.synthetic_pipeline import SyntheticSource
 
 
 class SideInputTest(LoadTest):
+  SIDE_INPUT_TYPES = {
+      'iter': beam.pvalue.AsIter,
+      'list': beam.pvalue.AsList,
+      'dict': beam.pvalue.AsDict,
+  }
+
   def __init__(self):
     super(SideInputTest, self).__init__()
-    self.iterations = self.get_option_or_default(
-        'number_of_counter_operations', 1)
+    self.windows = self.get_option_or_default('window_count', default=0)
+    self.access_percentage = self.get_option_or_default(
+        'access_percentage', default=100)
+    if self.access_percentage < 0 or self.access_percentage > 100:
+      raise ValueError(
+          'access_percentage: Invalid value. Should be in range '
+          'from 0 to 100, got {} instead'.format(self.access_percentage))
+
+    self.side_input_size = self.get_option_or_default(
+        'side_input_size', default=0)
+    if self.side_input_size == 0:
+      self.side_input_size = self.input_options.get('num_records')
+
+    self.side_input_type = self.pipeline.get_option('side_input_type')
+    if self.side_input_type is None:
+      raise ValueError('side_input_type is required')
+
+  def materialize_as(self):
+    try:
+      return self.SIDE_INPUT_TYPES[self.side_input_type]
+    except KeyError:
+      raise ValueError(
+          'Unknown side input type. You have to provide one of '
+          'these: {}'.format(list(self.SIDE_INPUT_TYPES.keys())))
 
   def test(self):
-    def join_fn(element, side_input, iterations):
-      result = []
-      for i in range(iterations):
-        for key, value in side_input:
-          if i == iterations - 1:
-            result.append({key: element[1] + value})
-      yield result
-
-    main_input = (
+    class SequenceSideInputTestDoFn(beam.DoFn):
+      """Iterate over first n side_input elements. Iterate over all
+      elements if `first_n` is :data:`None`."""
+      def __init__(self, first_n=None):
+        self._first_n = first_n
+
+      def process(self, unused_element, side_input):
+        for i, _ in enumerate(side_input):
+          if self._first_n and i >= self._first_n:
+            return
+          # No-op. We only make sure that the element is accessed.
+
+    class MappingSideInputTestDoFn(beam.DoFn):
+      """Take a sequence of keys as an additional side input and for each
+      key in the sequence checks the value for key in the dictionary."""
+      def process(self, unused_element, dict_side_input, keys_to_check):
+        for key in keys_to_check:
+          # No-op. We only make sure that the element is accessed.
+          dict_side_input[key]
+
+    class GetRandomKeys(beam.DoFn):
+      def __init__(self, n):
+        self._n = n
+
+      def process(self, unused_element, dict_side_input):
+        import random
+        n = min(self._n, len(dict_side_input))
+        return random.sample(dict_side_input.keys(), n)
+
+    class AddEventTimestamps(beam.DoFn):
+      """Assign timestamp to each element of PCollection, starting from the
+      current Unix seconds-since-epoch timestamp."""
+      def setup(self):
+        import time
+        self.current_time = int(time.time())
 
 Review comment:
   No particular reason. I'll change it to start at 0.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to