[ 
https://issues.apache.org/jira/browse/BEAM-7402?focusedWorklogId=253035&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-253035
 ]

ASF GitHub Bot logged work on BEAM-7402:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Jun/19 10:17
            Start Date: 03/Jun/19 10:17
    Worklog Time Spent: 10m 
      Work Description: kamilwu commented on pull request #8675: [BEAM-7402] 
BigQuery IO read performance tests 
URL: https://github.com/apache/beam/pull/8675#discussion_r289777487
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery_io_read_pipeline.py
 ##########
 @@ -14,66 +14,135 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-"""A Dataflow job that counts the number of rows in a BQ table.
-
-   Can be configured to simulate slow reading for a given number of rows.
+"""
+A pipeline that reads data from a BigQuery table and counts the number of
+rows.
+
+Besides of the standard options, there are options with special meaning:
+* input_dataset - BQ dataset id.
+* input_table - BQ table id.
+The table will be created and populated with data from Synthetic Source if it
+does not exist.
+* input_options - options for Synthetic Source:
+num_records - number of rows to be inserted,
+value_size - the length of a single row,
+key_size - required option, but its value has no meaning,
+* num_slow - an integer in range [0,100] used to customize slow reading
+simulation.
+
+Example test run on DataflowRunner:
+
+python setup.py nosetests \
+    --test-pipeline-options="
+    --runner=TestDataflowRunner
+    --project=...
+    --staging_location=gs://...
+    --temp_location=gs://...
+    --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --input_dataset=...
+    --input_table=...
+    --input_options='{
+    \"num_records\": 1024,
+    \"key_size\": 1,
+    \"value_size\": 1024,
+    }'" \
+    --tests apache_beam.io.gcp.tests.bigquery_io_read_pipeline
 """
 
 from __future__ import absolute_import
 
-import argparse
+import base64
 import logging
 import random
 import time
-
-import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
+import unittest
+
+from apache_beam import DoFn
+from apache_beam import Map
+from apache_beam import ParDo
+from apache_beam.io import BigQueryDisposition
+from apache_beam.io import BigQuerySource
+from apache_beam.io import Read
+from apache_beam.io import WriteToBigQuery
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
-
-
-class RowToStringWithSlowDown(beam.DoFn):
-
-  def process(self, element, num_slow=0, *args, **kwargs):
-
-    if num_slow == 0:
-      yield ['row']
-    else:
-      rand = random.random() * 100
-      if rand < num_slow:
-        time.sleep(0.01)
-        yield ['slow_row']
-      else:
+from apache_beam.transforms.combiners import Count
+
+
+class BigQueryIOReadTest(LoadTest):
+  def setUp(self):
+    super(BigQueryIOReadTest, self).setUp()
+    self.num_slow = self.pipeline.get_option('num_slow') or 0
+    self.input_dataset = self.pipeline.get_option('input_dataset')
+    self.input_table = self.pipeline.get_option('input_table')
+    self._check_for_input_data()
+
+  def _check_for_input_data(self):
+    """Checks if a BQ table with input data exists and creates it if not."""
+    wrapper = BigQueryWrapper()
+    try:
+      wrapper.get_table(self.project_id, self.input_dataset, self.input_table)
+    except Exception:
+      self._create_input_data()
+
+  def _create_input_data(self):
+    """
+    Runs an additional pipeline which creates test data and waits for its
+    completion.
+    """
+    SCHEMA = parse_table_schema_from_json(
+        '{"fields": [{"name": "data", "type": "BYTES"}]}')
+
+    def format_record(record):
+      # Since Syntentic Source returns data as a dictionary. we should skip
+      # one of the part
+      return {'data': base64.b64encode(record[1])}
+
+    p = TestPipeline()
+    # pylint: disable=expression-not-assigned
+    (p
+     | 'Produce rows' >> Read(SyntheticSource(self.parseTestPipelineOptions()))
+     | 'Format' >> Map(format_record)
+     | 'Write to BigQuery' >> WriteToBigQuery(
+         self.input_dataset + '.' + self.input_table,
+         schema=SCHEMA,
+         create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+         write_disposition=BigQueryDisposition.WRITE_EMPTY))
+    p.run().wait_until_finish()
+
+  class RowToStringWithSlowDown(DoFn):
+    def process(self, element, num_slow=0, *args, **kwargs):
+      if num_slow == 0:
         yield ['row']
+      else:
+        rand = random.random() * 100
+        if rand < num_slow:
+          time.sleep(0.01)
+          yield ['slow_row']
+        else:
+          yield ['row']
 
+  def test(self):
+    run_pipeline(self.pipeline, self.input_dataset, self.input_table,
 
 Review comment:
   Correct me if I'm wrong, but I think the slow reading doesn't care where do 
the data come from. Its purpose is to just slow down the execution.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 253035)
    Time Spent: 1h 10m  (was: 1h)

> Add a performance test for BigQueryIO read
> ------------------------------------------
>
>                 Key: BEAM-7402
>                 URL: https://issues.apache.org/jira/browse/BEAM-7402
>             Project: Beam
>          Issue Type: Test
>          Components: testing
>            Reporter: Kamil Wasilewski
>            Assignee: Kamil Wasilewski
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The task is to add a performance test for BigQuery IO read with configurable 
> data size.
> The plan is:
> 1) Setup BQ table with input data if needed,
> 2) Read data from that table and do some final assertions
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to