This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cdea885 [BEAM-6553] A Python SDK sink that supports File Loads into BQ (#7655) cdea885 is described below commit cdea885872b3be7de9ba22f22700be89f7d53766 Author: Pablo <pabl...@users.noreply.github.com> AuthorDate: Sat Feb 16 18:58:36 2019 -0800 [BEAM-6553] A Python SDK sink that supports File Loads into BQ (#7655) * First sketch for BQ File Loads in Python SDK * Sketching multiple destinations functionality * BigQuery file loads wired up to WriteToBigQuery * Creating temporary tables for atomicity on multiple load jobs Improving todos, documentation * Improving logging * Fixing DirectRunner ITs * Deleting temporary tables and testng corner cases * Fixing lint issue * Removing one integration test. Addressing comments. * Rewindowing input * Managing the limit of files per job --- .../io/gcp/big_query_query_to_table_it_test.py | 7 + .../io/gcp/big_query_query_to_table_pipeline.py | 7 +- sdks/python/apache_beam/io/gcp/bigquery.py | 97 +++- .../apache_beam/io/gcp/bigquery_file_loads.py | 601 +++++++++++++++++++++ .../apache_beam/io/gcp/bigquery_file_loads_test.py | 462 ++++++++++++++++ sdks/python/apache_beam/io/gcp/bigquery_tools.py | 92 +++- .../apache_beam/io/gcp/tests/bigquery_matcher.py | 58 +- .../runners/dataflow/dataflow_runner.py | 21 - sdks/python/apache_beam/transforms/ptransform.py | 6 +- 9 files changed, 1296 insertions(+), 55 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py index 43db185..8980261 100644 --- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py +++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py @@ -128,8 +128,11 @@ class BigQueryQueryToTableIT(unittest.TestCase): project=self.project, query=verify_query, checksum=expected_checksum)] + + gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table extra_opts = {'query': LEGACY_QUERY, 'output': self.output_table, + 'gs_location': gs_location, 'output_schema': DIALECT_OUTPUT_SCHEMA, 'use_standard_sql': False, 'on_success_matcher': all_of(*pipeline_verifiers)} @@ -144,8 +147,10 @@ class BigQueryQueryToTableIT(unittest.TestCase): project=self.project, query=verify_query, checksum=expected_checksum)] + gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table extra_opts = {'query': STANDARD_QUERY, 'output': self.output_table, + 'gs_location': gs_location, 'output_schema': DIALECT_OUTPUT_SCHEMA, 'use_standard_sql': True, 'on_success_matcher': all_of(*pipeline_verifiers)} @@ -186,9 +191,11 @@ class BigQueryQueryToTableIT(unittest.TestCase): query=verify_query, checksum=expected_checksum)] self._setup_new_types_env() + gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table extra_opts = { 'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE), 'output': self.output_table, + 'gs_location': gs_location, 'output_schema': NEW_TYPES_OUTPUT_SCHEMA, 'use_standard_sql': False, 'on_success_matcher': all_of(*pipeline_verifiers)} diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py index 26b418a..b35f4e5 100644 --- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py +++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py @@ -50,6 +50,9 @@ def run_bq_pipeline(argv=None): help='Output BQ table to write results to.') parser.add_argument('--kms_key', default=None, help='Use this Cloud KMS key with BigQuery.') + parser.add_argument('--gs_location', + default=None, + help='GCS bucket location to use to store files.') known_args, pipeline_args = parser.parse_known_args(argv) table_schema = parse_table_schema_from_json(known_args.output_schema) @@ -62,12 +65,12 @@ def run_bq_pipeline(argv=None): (p | 'read' >> beam.io.Read(beam.io.BigQuerySource( query=known_args.query, use_standard_sql=known_args.use_standard_sql, kms_key=kms_key)) - | 'write' >> beam.io.Write(beam.io.BigQuerySink( + | 'write' >> beam.io.WriteToBigQuery( known_args.output, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY, - kms_key=known_args.kms_key))) + gs_location=known_args.gs_location)) result = p.run() result.wait_until_finish() diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 8f3011b..b1c1f90 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -134,6 +134,7 @@ from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.dataflow.native_io import iobase as dataflow_io from apache_beam.transforms import DoFn from apache_beam.transforms import ParDo @@ -449,6 +450,10 @@ bigquery_v2_messages.TableSchema` object. match the expected format. """ + import warnings + warnings.warn("This class is deprecated and will be permanently removed " + "in a future version of beam.") + # Import here to avoid adding the dependency for local running scenarios. try: # pylint: disable=wrong-import-order, wrong-import-position @@ -646,18 +651,32 @@ class BigQueryWriteFn(DoFn): class WriteToBigQuery(PTransform): - def __init__(self, table, dataset=None, project=None, schema=None, + def __init__(self, + table, + dataset=None, + project=None, + schema=None, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_APPEND, - batch_size=None, kms_key=None, test_client=None): + kms_key=None, + batch_size=None, + max_file_size=None, + max_files_per_bundle=None, + test_client=None, + gs_location=None): """Initialize a WriteToBigQuery transform. Args: - table (str): The ID of the table. The ID must contain only letters - ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If dataset - argument is :data:`None` then the table argument must contain the - entire table reference specified as: ``'DATASET.TABLE'`` or - ``'PROJECT:DATASET.TABLE'``. + table (str, callable): The ID of the table, or a callable + that returns it. The ID must contain only letters ``a-z``, ``A-Z``, + numbers ``0-9``, or underscores ``_``. If dataset argument is + :data:`None` then the table argument must contain the entire table + reference specified as: ``'DATASET.TABLE'`` + or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one + argument representing an element to be written to BigQuery, and return + a TableReference, or a string table name as specified above. + Multiple destinations are only supported on Batch pipelines at the + moment. dataset (str): The ID of the dataset containing this table or :data:`None` if the table reference is specified entirely by the table argument. @@ -691,12 +710,23 @@ bigquery_v2_messages.TableSchema` empty. For streaming pipelines WriteTruncate can not be used. - - batch_size (int): Number of rows to be written to BQ per streaming API - insert. kms_key (str): Experimental. Optional Cloud KMS key name for use when creating new tables. + batch_size (int): Number of rows to be written to BQ per streaming API + insert. The default is 500. + insert. test_client: Override the default bigquery client used for testing. + max_file_size (int): The maximum size for a file to be written and then + loaded into BigQuery. The default value is 4TB, which is 80% of the + limit of 5TB for BigQuery to load any file. + max_files_per_bundle(int): The maximum number of files to be concurrently + written by a worker. The default here is 20. Larger values will allow + writing to multiple destinations without having to reshard - but they + increase the memory burden on the workers. + gs_location (str): A GCS location to store files to be used for file + loads into BigQuery. By default, this will use the pipeline's + temp_location, but for pipelines whose temp_location is not appropriate + for BQ File Loads, users should pass a specific one. """ self.table_reference = bigquery_tools.parse_table_reference( table, dataset, project) @@ -708,6 +738,9 @@ bigquery_v2_messages.TableSchema` self.batch_size = batch_size self.kms_key = kms_key self.test_client = test_client + self.gs_location = gs_location + self.max_file_size = max_file_size + self.max_files_per_bundle = max_files_per_bundle @staticmethod def get_table_schema_from_string(schema): @@ -788,20 +821,40 @@ bigquery_v2_messages.TableSchema): raise TypeError('Unexpected schema argument: %s.' % schema) def expand(self, pcoll): - if self.table_reference.projectId is None: + p = pcoll.pipeline + + # TODO(pabloem): Use a different method to determine if streaming or batch. + standard_options = p.options.view_as(StandardOptions) + + if (not callable(self.table_reference) + and self.table_reference.projectId is None): self.table_reference.projectId = pcoll.pipeline.options.view_as( GoogleCloudOptions).project - bigquery_write_fn = BigQueryWriteFn( - table_id=self.table_reference.tableId, - dataset_id=self.table_reference.datasetId, - project_id=self.table_reference.projectId, - batch_size=self.batch_size, - schema=self.get_dict_table_schema(self.schema), - create_disposition=self.create_disposition, - write_disposition=self.write_disposition, - kms_key=self.kms_key, - test_client=self.test_client) - return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn) + + if standard_options.streaming: + # TODO: Support load jobs for streaming pipelines. + bigquery_write_fn = BigQueryWriteFn( + table_id=self.table_reference.tableId, + dataset_id=self.table_reference.datasetId, + project_id=self.table_reference.projectId, + batch_size=self.batch_size, + schema=self.get_dict_table_schema(self.schema), + create_disposition=self.create_disposition, + write_disposition=self.write_disposition, + kms_key=self.kms_key, + test_client=self.test_client) + return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn) + else: + from apache_beam.io.gcp import bigquery_file_loads + return pcoll | bigquery_file_loads.BigQueryBatchFileLoads( + destination=self.table_reference, + schema=self.get_dict_table_schema(self.schema), + create_disposition=self.create_disposition, + write_disposition=self.write_disposition, + max_file_size=self.max_file_size, + max_files_per_bundle=self.max_files_per_bundle, + gs_location=self.gs_location, + test_client=self.test_client) def display_data(self): res = {} diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py new file mode 100644 index 0000000..96ba2ab --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -0,0 +1,601 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Functionality to perform file loads into BigQuery for Batch and Streaming +pipelines. + +This source is able to work around BigQuery load quotas and limitations. When +destinations are dynamic, or when data for a single job is too large, the data +will be split into multiple jobs. + +NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. +""" + +from __future__ import absolute_import + +import datetime +import hashlib +import itertools +import logging +import random +import time +import uuid + +from future.utils import iteritems + +import apache_beam as beam +from apache_beam import pvalue +from apache_beam.io import filesystems as fs +from apache_beam.io.gcp import bigquery_tools +from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api +from apache_beam.options import value_provider as vp +from apache_beam.transforms.combiners import Count + +ONE_TERABYTE = (1 << 40) + +# The maximum file size for imports is 5TB. We keep our files under that. +_DEFAULT_MAX_FILE_SIZE = 4 * ONE_TERABYTE + +_DEFAULT_MAX_WRITERS_PER_BUNDLE = 20 + +# The maximum size for a single load job is one terabyte +_MAXIMUM_LOAD_SIZE = 15 * ONE_TERABYTE + +# Big query only supports up to 10 thousand URIs for a single load job. +_MAXIMUM_SOURCE_URIS = 10*1000 + + +def _generate_load_job_name(): + datetime_component = datetime.datetime.now().strftime("%Y_%m_%d_%H%M%S") + # TODO(pabloem): include job id / pipeline component? + return 'beam_load_%s_%s' % (datetime_component, random.randint(0, 100)) + + +def _generate_file_prefix(pipeline_gcs_location): + # If a gcs location is provided to the pipeline, then we shall use that. + # Otherwise, we shall use the temp_location from pipeline options. + gcs_base = str(pipeline_gcs_location or + vp.RuntimeValueProvider.get_value('temp_location', str, '')) + prefix_uuid = _bq_uuid() + return fs.FileSystems.join(gcs_base, 'bq_load', prefix_uuid) + + +def _make_new_file_writer(file_prefix, destination): + if isinstance(destination, bigquery_api.TableReference): + destination = '%s:%s.%s' % ( + destination.projectId, destination.datasetId, destination.tableId) + + directory = fs.FileSystems.join(file_prefix, destination) + + if not fs.FileSystems.exists(directory): + fs.FileSystems.mkdirs(directory) + + file_name = str(uuid.uuid4()) + file_path = fs.FileSystems.join(file_prefix, destination, file_name) + + return file_path, fs.FileSystems.create(file_path, 'application/text') + + +def _bq_uuid(seed=None): + if not seed: + return str(uuid.uuid4()).replace("-", "") + else: + return str(hashlib.md5(seed).hexdigest()) + + +class _AppendDestinationsFn(beam.DoFn): + """Adds the destination to an element, making it a KV pair. + + Outputs a PCollection of KV-pairs where the key is a TableReference for the + destination, and the value is the record itself. + + Experimental; no backwards compatibility guarantees. + """ + + def __init__(self, destination): + if callable(destination): + self.destination = destination + else: + self.destination = lambda x: destination + + def process(self, element): + yield (self.destination(element), element) + + +class _ShardDestinations(beam.DoFn): + """Adds a shard number to the key of the KV element. + + Experimental; no backwards compatibility guarantees.""" + DEFAULT_SHARDING_FACTOR = 10 + + def __init__(self, sharding_factor=DEFAULT_SHARDING_FACTOR): + self.sharding_factor = sharding_factor + + def start_bundle(self): + self._shard_count = random.randrange(self.sharding_factor) + + def process(self, element): + destination = element[0] + row = element[1] + + sharded_destination = (destination, + self._shard_count % self.sharding_factor) + self._shard_count += 1 + yield (sharded_destination, row) + + +class WriteRecordsToFile(beam.DoFn): + """Write input records to files before triggering a load job. + + This transform keeps up to ``max_files_per_bundle`` files open to write to. It + receives (destination, record) tuples, and it writes the records to different + files for each destination. + + If there are more than ``max_files_per_bundle`` destinations that we need to + write to, then those records are grouped by their destination, and later + written to files by ``WriteGroupedRecordsToFile``. + + It outputs two PCollections. + """ + + UNWRITTEN_RECORD_TAG = 'UnwrittenRecords' + WRITTEN_FILE_TAG = 'WrittenFiles' + + def __init__(self, + max_files_per_bundle=_DEFAULT_MAX_WRITERS_PER_BUNDLE, + max_file_size=_DEFAULT_MAX_FILE_SIZE, + coder=None): + """Initialize a :class:`WriteRecordsToFile`. + + Args: + max_files_per_bundle (int): The maximum number of files that can be kept + open during execution of this step in a worker. This is to avoid over- + whelming the worker memory. + max_file_size (int): The maximum size in bytes for a file to be used in + an export job. + + """ + self.max_files_per_bundle = max_files_per_bundle + self.max_file_size = max_file_size + self.coder = coder or bigquery_tools.RowAsDictJsonCoder() + + def display_data(self): + return { + 'max_files_per_bundle': self.max_files_per_bundle, + 'max_file_size': str(self.max_file_size), + 'coder': self.coder.__class__.__name__ + } + + def start_bundle(self): + self._destination_to_file_writer = {} + + def process(self, element, file_prefix): + destination = element[0] + row = element[1] + + if destination in self._destination_to_file_writer: + writer = self._destination_to_file_writer[destination] + elif len(self._destination_to_file_writer) < self.max_files_per_bundle: + (file_path, writer) = _make_new_file_writer(file_prefix, destination) + self._destination_to_file_writer[destination] = writer + yield pvalue.TaggedOutput(WriteRecordsToFile.WRITTEN_FILE_TAG, + (destination, file_path)) + else: + yield pvalue.TaggedOutput( + WriteRecordsToFile.UNWRITTEN_RECORD_TAG, element) + return + + # TODO(pabloem): Is it possible for this to throw exception? + writer.write(self.coder.encode(row)) + writer.write(b'\n') + + if writer.tell() > self.max_file_size: + writer.close() + self._destination_to_file_writer.pop(destination) + + def finish_bundle(self): + for _, writer in iteritems(self._destination_to_file_writer): + writer.close() + self._destination_to_file_writer = {} + + +class WriteGroupedRecordsToFile(beam.DoFn): + """Receives collection of dest-iterable(records), writes it to files. + + This is different from ``WriteRecordsToFile`` because it receives records + grouped by destination. This means that it's not necessary to keep multiple + file descriptors open, because we know for sure when records for a single + destination have been written out. + + Experimental; no backwards compatibility guarantees. + """ + + def __init__(self, max_file_size=_DEFAULT_MAX_FILE_SIZE, + coder=None): + self.max_file_size = max_file_size + self.coder = coder or bigquery_tools.RowAsDictJsonCoder() + + def process(self, element, file_prefix): + destination = element[0] + rows = element[1] + + writer = None + + for row in rows: + if writer is None: + (file_path, writer) = _make_new_file_writer(file_prefix, destination) + yield (destination, file_path) + + writer.write(self.coder.encode(row)) + writer.write(b'\n') + + if writer.tell() > self.max_file_size: + writer.close() + writer = None + + +class TriggerCopyJobs(beam.DoFn): + """Launches jobs to copy from temporary tables into the main target table. + + When a job needs to write to multiple destination tables, or when a single + destination table needs to have multiple load jobs to write to it, files are + loaded into temporary tables, and those tables are later copied to the + destination tables. + + This transform emits (destination, job_reference) pairs. + """ + def __init__(self, + create_disposition=None, + write_disposition=None, + test_client=None, + temporary_tables=False): + self.create_disposition = create_disposition + self.write_disposition = write_disposition + self.test_client = test_client + self.temporary_tables = temporary_tables + + def start_bundle(self): + self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client) + + def process(self, element, job_name_prefix=None): + destination = element[0] + job_reference = element[1] + + if not self.temporary_tables: + # If we did not use temporary tables, then we do not need to trigger any + # copy jobs. + return + + copy_to_reference = bigquery_tools.parse_table_reference(destination) + if copy_to_reference.projectId is None: + copy_to_reference.projectId = vp.RuntimeValueProvider.get_value( + 'project', str, '') + + copy_from_reference = bigquery_tools.parse_table_reference(destination) + copy_from_reference.tableId = job_reference.jobId + if copy_from_reference.projectId is None: + copy_from_reference.projectId = vp.RuntimeValueProvider.get_value( + 'project', str, '') + + copy_job_name = '%s_copy_%s_to_%s' % ( + job_name_prefix, + _bq_uuid('%s:%s.%s' % (copy_from_reference.projectId, + copy_from_reference.datasetId, + copy_from_reference.tableId)), + _bq_uuid('%s:%s.%s' % (copy_to_reference.projectId, + copy_to_reference.datasetId, + copy_to_reference.tableId))) + + logging.info("Triggering copy job from %s to %s", + copy_from_reference, copy_to_reference) + job_reference = self.bq_wrapper._insert_copy_job( + copy_to_reference.projectId, + copy_job_name, + copy_from_reference, + copy_to_reference, + create_disposition=self.create_disposition, + write_disposition=self.write_disposition) + + yield (destination, job_reference) + + +class TriggerLoadJobs(beam.DoFn): + """Triggers the import jobs to BQ. + + Experimental; no backwards compatibility guarantees. + """ + + TEMP_TABLES = 'TemporaryTables' + + def __init__(self, + schema=None, + create_disposition=None, + write_disposition=None, + test_client=None, + temporary_tables=False): + self.schema = schema + self.test_client = test_client + self.temporary_tables = temporary_tables + if self.temporary_tables: + # If we are loading into temporary tables, we rely on the default create + # and write dispositions, which mean that a new table will be created. + self.create_disposition = None + self.write_disposition = None + else: + self.create_disposition = create_disposition + self.write_disposition = write_disposition + + def display_data(self): + result = {'create_disposition': str(self.create_disposition), + 'write_disposition': str(self.write_disposition)} + if self.schema is not None: + result['schema'] = str(self.schema) + else: + result['schema'] = 'AUTODETECT' + + return result + + def start_bundle(self): + self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client) + + def process(self, element, load_job_name_prefix): + destination = element[0] + files = iter(element[1]) + + job_count = 0 + batch_of_files = list(itertools.islice(files, _MAXIMUM_SOURCE_URIS)) + while batch_of_files: + + table_reference = bigquery_tools.parse_table_reference(destination) + if table_reference.projectId is None: + table_reference.projectId = vp.RuntimeValueProvider.get_value( + 'project', str, '') + + # Load jobs for a single des5tination are always triggered from the same + # worker. This means that we can generate a deterministic numbered job id, + # and not need to worry. + job_name = '%s_%s_%s' % ( + load_job_name_prefix, + _bq_uuid('%s:%s.%s' % (table_reference.projectId, + table_reference.datasetId, + table_reference.tableId)), + job_count) + logging.debug("Batch of files has %s files. Job name is %s", + len(batch_of_files), job_name) + + if self.temporary_tables: + # For temporary tables, we create a new table with the name with JobId. + table_reference.tableId = job_name + yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference) + + logging.info("Triggering job %s to load data to BigQuery table %s.", + job_name, table_reference) + job_reference = self.bq_wrapper.perform_load_job( + table_reference, batch_of_files, job_name, + schema=self.schema, + write_disposition=self.write_disposition, + create_disposition=self.create_disposition) + yield (destination, job_reference) + + # Prepare to trigger the next job + job_count += 1 + batch_of_files = list(itertools.islice(files, _MAXIMUM_SOURCE_URIS)) + + +class WaitForBQJobs(beam.DoFn): + """Takes in a series of BQ job names as side input, and waits for all of them. + + If any job fails, it will fail. If all jobs succeed, it will succeed. + + Experimental; no backwards compatibility guarantees. + """ + ALL_DONE = object() + FAILED = object() + WAITING = object() + + def __init__(self, test_client): + self.test_client = test_client + + def start_bundle(self): + self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client) + + def process(self, element, dest_ids_list): + job_references = [elm[1] for elm in dest_ids_list] + + while True: + status = self._check_job_states(job_references) + if status == WaitForBQJobs.FAILED: + raise Exception( + 'BigQuery jobs failed. BQ error: %s', self._latest_error) + elif status == WaitForBQJobs.ALL_DONE: + return dest_ids_list # Pass the list of destination-jobs downstream + time.sleep(10) + + def _check_job_states(self, job_references): + for ref in job_references: + job = self.bq_wrapper.get_job(ref.projectId, + ref.jobId, + ref.location) + + logging.info("Job status: %s", job.status) + if job.status.state == 'DONE' and job.status.errorResult: + logging.warn("Job %s seems to have failed. Error Result: %s", + ref.jobId, job.status.errorResult) + self._latest_error = job.status + return WaitForBQJobs.FAILED + elif job.status.state == 'DONE': + continue + + return WaitForBQJobs.ALL_DONE + + +class DeleteTablesFn(beam.DoFn): + def __init__(self, test_client=None): + self.test_client = test_client + + def start_bundle(self): + self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client) + + def process(self, table_reference): + logging.info("Deleting table %s", table_reference) + table_reference = bigquery_tools.parse_table_reference(table_reference) + self.bq_wrapper._delete_table( + table_reference.projectId, + table_reference.datasetId, + table_reference.tableId) + + +class BigQueryBatchFileLoads(beam.PTransform): + """Takes in a set of elements, and inserts them to BigQuery via batch loads. + + """ + + DESTINATION_JOBID_PAIRS = 'destination_load_jobid_pairs' + DESTINATION_FILE_PAIRS = 'destination_file_pairs' + DESTINATION_COPY_JOBID_PAIRS = 'destination_copy_jobid_pairs' + + def __init__( + self, + destination, + schema=None, + gs_location=None, + create_disposition=None, + write_disposition=None, + coder=None, + max_file_size=None, + max_files_per_bundle=None, + test_client=None): + self.destination = destination + self.create_disposition = create_disposition + self.write_disposition = write_disposition + self.max_file_size = max_file_size or _DEFAULT_MAX_FILE_SIZE + self.max_files_per_bundle = (max_files_per_bundle or + _DEFAULT_MAX_WRITERS_PER_BUNDLE) + self._input_gs_location = gs_location + self.test_client = test_client + self.schema = schema + self.coder = coder or bigquery_tools.RowAsDictJsonCoder() + + # If we have multiple destinations, then we will have multiple load jobs, + # thus we will need temporary tables for atomicity. + # If the destination is a single one, we assume that we will have only one + # job to run - and thus we avoid using temporary tables + self.temp_tables = True if callable(destination) else False + + def expand(self, pcoll): + p = pcoll.pipeline + + load_job_name_pcv = pvalue.AsSingleton( + p + | "ImpulseJobName" >> beam.Create([None]) + | beam.Map(lambda _: _generate_load_job_name())) + + file_prefix_pcv = pvalue.AsSingleton( + p + | "CreateFilePrefixView" >> beam.Create([self._input_gs_location]) + | "GenerateFilePrefix" >> beam.Map(_generate_file_prefix)) + + outputs = ( + pcoll + | "ApplyGlobalWindow" >> beam.WindowInto(beam.window.GlobalWindows()) + | "AppendDestination" >> beam.ParDo(_AppendDestinationsFn( + self.destination)) + | beam.ParDo( + WriteRecordsToFile(max_files_per_bundle=self.max_files_per_bundle, + max_file_size=self.max_file_size, + coder=self.coder), + file_prefix=file_prefix_pcv).with_outputs( + WriteRecordsToFile.UNWRITTEN_RECORD_TAG, + WriteRecordsToFile.WRITTEN_FILE_TAG)) + + # A PCollection of (destination, file) tuples. It lists files with records, + # and the destination each file is meant to be imported into. + destination_files_kv_pc = outputs[WriteRecordsToFile.WRITTEN_FILE_TAG] + + # A PCollection of (destination, record) tuples. These are later sharded, + # grouped, and all records for each destination-shard is written to files. + # This PCollection is necessary because not all records can be written into + # files in ``WriteRecordsToFile``. + unwritten_records_pc = outputs[WriteRecordsToFile.UNWRITTEN_RECORD_TAG] + + more_destination_files_kv_pc = ( + unwritten_records_pc + | beam.ParDo(_ShardDestinations()) + | "GroupShardedRows" >> beam.GroupByKey() + | "DropShardNumber" >> beam.Map(lambda x: (x[0][0], x[1])) + | "WriteGroupedRecordsToFile" >> beam.ParDo(WriteGroupedRecordsToFile( + coder=self.coder), file_prefix=file_prefix_pcv) + ) + + all_destination_file_pairs_pc = ( + (destination_files_kv_pc, more_destination_files_kv_pc) + | "DestinationFilesUnion" >> beam.Flatten()) + + grouped_files_pc = ( + all_destination_file_pairs_pc + | "GroupFilesByTableDestinations" >> beam.GroupByKey()) + + # Load Jobs are triggered to temporary tables, and those are later copied to + # the actual appropriate destination query. This ensures atomicity when only + # some of the load jobs would fail but not other. + # If any of them fails, then copy jobs are not triggered. + trigger_loads_outputs = ( + grouped_files_pc | beam.ParDo(TriggerLoadJobs( + schema=self.schema, + write_disposition=self.write_disposition, + create_disposition=self.create_disposition, + test_client=self.test_client, + temporary_tables=self.temp_tables), load_job_name_pcv).with_outputs( + TriggerLoadJobs.TEMP_TABLES, main='main') + ) + + destination_job_ids_pc = trigger_loads_outputs['main'] + temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES] + + destination_copy_job_ids_pc = ( + p + | "ImpulseMonitorLoadJobs" >> beam.Create([None]) + | "WaitForLoadJobs" >> beam.ParDo( + WaitForBQJobs(self.test_client), + beam.pvalue.AsList(destination_job_ids_pc)) + | beam.ParDo(TriggerCopyJobs( + create_disposition=self.create_disposition, + write_disposition=self.write_disposition, + temporary_tables=self.temp_tables, + test_client=self.test_client), load_job_name_pcv)) + + finished_copy_jobs_pc = (p + | "ImpulseMonitorCopyJobs" >> beam.Create([None]) + | "WaitForCopyJobs" >> beam.ParDo( + WaitForBQJobs(self.test_client), + beam.pvalue.AsList(destination_copy_job_ids_pc) + )) + + _ = (finished_copy_jobs_pc + | "RemoveTempTables/PassTables" >> beam.FlatMap( + lambda x, deleting_tables: deleting_tables, + pvalue.AsIter(temp_tables_pc)) + | "RemoveTempTables/DeduplicateTables" >> Count.PerElement() + | "RemoveTempTables/GetTableNames" >> beam.Map(lambda elm: elm[0]) + | "RemoveTempTables/Delete" >> beam.ParDo(DeleteTablesFn())) + + return { + self.DESTINATION_JOBID_PAIRS: destination_job_ids_pc, + self.DESTINATION_FILE_PAIRS: all_destination_file_pairs_pc, + self.DESTINATION_COPY_JOBID_PAIRS: destination_copy_job_ids_pc, + } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py new file mode 100644 index 0000000..6a1166d --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -0,0 +1,462 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for BigQuery file loads utilities.""" + +from __future__ import absolute_import + +import json +import logging +import os +import random +import time +import unittest + +import mock +from hamcrest.core import assert_that as hamcrest_assert +from hamcrest.core.core.allof import all_of +from hamcrest.core.core.is_ import is_ +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp +from apache_beam.io.gcp import bigquery_file_loads as bqfl +from apache_beam.io.gcp import bigquery +from apache_beam.io.gcp import bigquery_tools +from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api +from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None + + +_DESTINATION_ELEMENT_PAIRS = [ + # DESTINATION 1 + ('project1:dataset1.table1', '{"name":"beam", "language":"py"}'), + ('project1:dataset1.table1', '{"name":"beam", "language":"java"}'), + ('project1:dataset1.table1', '{"name":"beam", "language":"go"}'), + ('project1:dataset1.table1', '{"name":"flink", "language":"java"}'), + ('project1:dataset1.table1', '{"name":"flink", "language":"scala"}'), + + # DESTINATION 3 + ('project1:dataset1.table3', '{"name":"spark", "language":"scala"}'), + + # DESTINATION 1 + ('project1:dataset1.table1', '{"name":"spark", "language":"py"}'), + ('project1:dataset1.table1', '{"name":"spark", "language":"scala"}'), + + # DESTINATION 2 + ('project1:dataset1.table2', '{"name":"beam", "foundation":"apache"}'), + ('project1:dataset1.table2', '{"name":"flink", "foundation":"apache"}'), + ('project1:dataset1.table2', '{"name":"spark", "foundation":"apache"}'), +] + +_NAME_LANGUAGE_ELEMENTS = [ + json.loads(elm[1]) + for elm in _DESTINATION_ELEMENT_PAIRS if "language" in elm[1] +] + + +_DISTINCT_DESTINATIONS = list( + set([elm[0] for elm in _DESTINATION_ELEMENT_PAIRS])) + + +_ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS]) + + +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp): + maxDiff = None + + def _consume_input(self, fn, checks=None): + if checks is None: + return + + with TestPipeline() as p: + output_pcs = ( + p + | beam.Create(_DESTINATION_ELEMENT_PAIRS) + | beam.ParDo(fn, self.tmpdir) + .with_outputs(fn.WRITTEN_FILE_TAG, fn.UNWRITTEN_RECORD_TAG)) + + checks(output_pcs) + return output_pcs + + def test_files_created(self): + """Test that the files are created and written.""" + + fn = bqfl.WriteRecordsToFile() + self.tmpdir = self._new_tempdir() + + def check_files_created(output_pcs): + dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG] + + files = dest_file_pc | "GetFiles" >> beam.Map(lambda x: x[1]) + file_count = files | "CountFiles" >> beam.combiners.Count.Globally() + + _ = files | "FilesExist" >> beam.Map( + lambda x: hamcrest_assert(os.path.exists(x), is_(True))) + assert_that(file_count, equal_to([3]), label='check file count') + + destinations = dest_file_pc | "GetDests" >> beam.Map(lambda x: x[0]) + assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)), + label='check destinations ') + + self._consume_input(fn, check_files_created) + + def test_many_files(self): + """Forces records to be written to many files. + + For each destination multiple files are necessary. This is because the max + file length is very small, so only a couple records fit in each file. + """ + + fn = bqfl.WriteRecordsToFile(max_file_size=50) + self.tmpdir = self._new_tempdir() + + def check_many_files(output_pcs): + dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG] + + files_per_dest = (dest_file_pc + | beam.Map(lambda x: x).with_output_types( + beam.typehints.KV[str, str]) + | beam.combiners.Count.PerKey()) + assert_that(files_per_dest, + equal_to([('project1:dataset1.table1', 4), + ('project1:dataset1.table2', 2), + ('project1:dataset1.table3', 1)])) + + # Check that the files exist + _ = dest_file_pc | beam.Map(lambda x: x[1]) | beam.Map( + lambda x: hamcrest_assert(os.path.exists(x), is_(True))) + + self._consume_input(fn, check_many_files) + + def test_records_are_spilled(self): + """Forces records to be written to many files. + + For each destination multiple files are necessary, and at most two files can + be created. This forces records to be spilled to the next stage of + processing. + """ + + fn = bqfl.WriteRecordsToFile(max_files_per_bundle=2) + self.tmpdir = self._new_tempdir() + + def check_many_files(output_pcs): + dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG] + spilled_records_pc = output_pcs[ + bqfl.WriteRecordsToFile.UNWRITTEN_RECORD_TAG] + + spilled_records_count = (spilled_records_pc | + beam.combiners.Count.Globally()) + assert_that(spilled_records_count, equal_to([3]), label='spilled count') + + files_per_dest = (dest_file_pc + | beam.Map(lambda x: x).with_output_types( + beam.typehints.KV[str, str]) + | beam.combiners.Count.PerKey()) + + # Only table1 and table3 get files. table2 records get spilled. + assert_that(files_per_dest, + equal_to([('project1:dataset1.table1', 1), + ('project1:dataset1.table3', 1)]), + label='file count') + + # Check that the files exist + _ = dest_file_pc | beam.Map(lambda x: x[1]) | beam.Map( + lambda x: hamcrest_assert(os.path.exists(x), is_(True))) + + self._consume_input(fn, check_many_files) + + +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp): + + def _consume_input(self, fn, input, checks): + if checks is None: + return + + with TestPipeline() as p: + res = (p + | beam.Create(input) + | beam.GroupByKey() + | beam.ParDo(fn, self.tmpdir)) + + checks(res) + return res + + def test_files_are_created(self): + """Test that the files are created and written.""" + + fn = bqfl.WriteGroupedRecordsToFile() + self.tmpdir = self._new_tempdir() + + def check_files_created(output_pc): + files = output_pc | "GetFiles" >> beam.Map(lambda x: x[1]) + file_count = files | "CountFiles" >> beam.combiners.Count.Globally() + + _ = files | "FilesExist" >> beam.Map( + lambda x: hamcrest_assert(os.path.exists(x), is_(True))) + assert_that(file_count, equal_to([3]), label='check file count') + + destinations = output_pc | "GetDests" >> beam.Map(lambda x: x[0]) + assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)), + label='check destinations ') + + self._consume_input( + fn, _DESTINATION_ELEMENT_PAIRS, check_files_created) + + def test_multiple_files(self): + """Forces records to be written to many files. + + For each destination multiple files are necessary. This is because the max + file length is very small, so only a couple records fit in each file. + """ + fn = bqfl.WriteGroupedRecordsToFile(max_file_size=50) + self.tmpdir = self._new_tempdir() + + def check_multiple_files(output_pc): + files_per_dest = output_pc | beam.combiners.Count.PerKey() + assert_that(files_per_dest, + equal_to([('project1:dataset1.table1', 4), + ('project1:dataset1.table2', 2), + ('project1:dataset1.table3', 1), ])) + + # Check that the files exist + _ = output_pc | beam.Map(lambda x: x[1]) | beam.Map(os.path.exists) + + self._consume_input(fn, _DESTINATION_ELEMENT_PAIRS, check_multiple_files) + + +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp): + + def test_records_traverse_transform_with_mocks(self): + destination = 'project1:dataset1.table1' + + job_reference = bigquery_api.JobReference() + job_reference.projectId = 'project1' + job_reference.jobId = 'job_name1' + result_job = bigquery_api.Job() + result_job.jobReference = job_reference + + mock_job = mock.Mock() + mock_job.status.state = 'DONE' + mock_job.status.errorResult = None + mock_job.jobReference = job_reference + + bq_client = mock.Mock() + bq_client.jobs.Get.return_value = mock_job + + bq_client.jobs.Insert.return_value = result_job + + transform = bigquery.WriteToBigQuery( + destination, + gs_location=self._new_tempdir(), + test_client=bq_client) + + # Need to test this with the DirectRunner to avoid serializing mocks + with TestPipeline('DirectRunner') as p: + outputs = p | beam.Create(_ELEMENTS) | transform + + dest_files = outputs[bqfl.BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS] + dest_job = outputs[bqfl.BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS] + + jobs = dest_job | "GetJobs" >> beam.Map(lambda x: x[1]) + + files = dest_files | "GetFiles" >> beam.Map(lambda x: x[1]) + destinations = (dest_files + | "GetUniques" >> beam.combiners.Count.PerKey() + | "GetDests" >> beam.Map(lambda x: x[0])) + + # All files exist + _ = (files | beam.Map( + lambda x: hamcrest_assert(os.path.exists(x), is_(True)))) + + # One file per destination + assert_that(files | beam.combiners.Count.Globally(), + equal_to([1]), + label='CountFiles') + + assert_that(destinations, + equal_to([bigquery_tools.parse_table_reference(destination)]), + label='CheckDestinations') + + assert_that(jobs, + equal_to([job_reference]), label='CheckJobs') + + +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class BigQueryFileLoadsIT(unittest.TestCase): + + BIG_QUERY_DATASET_ID = 'python_bq_file_loads_' + BIG_QUERY_SCHEMA = ( + '{"fields": [{"name": "name","type": "STRING"},' + '{"name": "language","type": "STRING"}]}' + ) + + BIG_QUERY_SCHEMA_2 = ( + '{"fields": [{"name": "name","type": "STRING"},' + '{"name": "foundation","type": "STRING"}]}' + ) + + def setUp(self): + self.test_pipeline = TestPipeline(is_integration_test=True) + self.runner_name = type(self.test_pipeline.runner).__name__ + self.project = self.test_pipeline.get_option('project') + + self.dataset_id = '%s%s%d' % (self.BIG_QUERY_DATASET_ID, + str(int(time.time())), + random.randint(0, 10000)) + self.bigquery_client = bigquery_tools.BigQueryWrapper() + self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) + self.output_table = "%s.output_table" % (self.dataset_id) + logging.info("Created dataset %s in project %s", + self.dataset_id, self.project) + + @attr('IT') + def test_multiple_destinations_transform(self): + output_table_1 = '%s%s' % (self.output_table, 1) + output_table_2 = '%s%s' % (self.output_table, 2) + output_table_3 = '%s%s' % (self.output_table, 3) + output_table_4 = '%s%s' % (self.output_table, 4) + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % output_table_1, + data=[(d['name'], d['language']) + for d in _ELEMENTS + if 'language' in d]), + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % output_table_2, + data=[(d['name'], d['foundation']) + for d in _ELEMENTS + if 'foundation' in d]), + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % output_table_3, + data=[(d['name'], d['language']) + for d in _ELEMENTS + if 'language' in d]), + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % output_table_4, + data=[(d['name'], d['foundation']) + for d in _ELEMENTS + if 'foundation' in d])] + + args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=all_of(*pipeline_verifiers)) + + with beam.Pipeline(argv=args) as p: + input = p | beam.Create(_ELEMENTS) + + # Get all input in same machine + input = (input + | beam.Map(lambda x: (None, x)) + | beam.GroupByKey() + | beam.FlatMap(lambda elm: elm[1])) + + _ = (input | + "WriteWithMultipleDestsFreely" >> bigquery.WriteToBigQuery( + table=lambda x: (output_table_1 + if 'language' in x + else output_table_2), + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) + + _ = (input | + "WriteWithMultipleDests" >> bigquery.WriteToBigQuery( + table=lambda x: (output_table_3 + if 'language' in x + else output_table_4), + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY, + max_file_size=20, + max_files_per_bundle=-1)) + + @attr('IT') + def test_one_job_fails_all_jobs_fail(self): + + # If one of the import jobs fails, then other jobs must not be performed. + # This is to avoid reinsertion of some records when a pipeline fails and + # is rerun. + output_table_1 = '%s%s' % (self.output_table, 1) + output_table_2 = '%s%s' % (self.output_table, 2) + + self.bigquery_client.get_or_create_table( + self.project, self.dataset_id, output_table_1.split('.')[1], + bigquery_tools.parse_table_schema_from_json(self.BIG_QUERY_SCHEMA), + None, None) + self.bigquery_client.get_or_create_table( + self.project, self.dataset_id, output_table_2.split('.')[1], + bigquery_tools.parse_table_schema_from_json(self.BIG_QUERY_SCHEMA_2), + None, None) + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % output_table_1, + data=[]), + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % output_table_2, + data=[])] + + args = self.test_pipeline.get_full_options_as_args() + + with self.assertRaises(Exception): + with beam.Pipeline(argv=args) as p: + input = p | beam.Create(_ELEMENTS) + input2 = p | "Broken record" >> beam.Create(['language_broken_record']) + + input = (input, input2) | beam.Flatten() + + _ = (input | + "WriteWithMultipleDests" >> bigquery.WriteToBigQuery( + table=lambda x: (output_table_1 + if 'language' in x + else output_table_2), + create_disposition=( + beam.io.BigQueryDisposition.CREATE_IF_NEEDED), + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) + + hamcrest_assert(p, all_of(*pipeline_verifiers)) + + def tearDown(self): + request = bigquery_api.BigqueryDatasetsDeleteRequest( + projectId=self.project, datasetId=self.dataset_id, + deleteContents=True) + try: + logging.info("Deleting dataset %s in project %s", + self.dataset_id, self.project) + self.bigquery_client.client.datasets.Delete(request) + except HttpError: + logging.debug('Failed to clean up dataset %s in project %s', + self.dataset_id, self.project) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 93d5299..2f4fd52 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -21,6 +21,8 @@ Classes, constants and functions in this file are experimental and have no backwards compatibility guarantees. These tools include wrappers and clients to interact with BigQuery APIs. + +NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. """ from __future__ import absolute_import @@ -127,7 +129,7 @@ def parse_table_reference(table, dataset=None, project=None): argument. Returns: - A bigquery.TableReference object. + A TableReference for the table name that was provided. Raises: ValueError: if the table reference as a string does not match the expected @@ -136,6 +138,8 @@ def parse_table_reference(table, dataset=None, project=None): if isinstance(table, bigquery.TableReference): return table + elif callable(table): + return table table_reference = bigquery.TableReference() # If dataset argument is not specified, the expectation is that the @@ -256,24 +260,67 @@ class BigQueryWrapper(object): @retry.with_exponential_backoff( num_retries=MAX_RETRIES, retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _insert_load_job(self, project_id, job_id, table_reference, source_uris, - schema=None): + def _insert_copy_job(self, + project_id, + job_id, + from_table_reference, + to_table_reference, + create_disposition=None, + write_disposition=None): + reference = bigquery.JobReference() + reference.jobId = job_id + reference.projectId = project_id + request = bigquery.BigqueryJobsInsertRequest( + projectId=project_id, + job=bigquery.Job( + configuration=bigquery.JobConfiguration( + copy=bigquery.JobConfigurationTableCopy( + destinationTable=to_table_reference, + sourceTable=from_table_reference, + createDisposition=create_disposition, + writeDisposition=write_disposition, + ) + ), + jobReference=reference, + ) + ) + + logging.info("Inserting job request: %s", request) + response = self.client.jobs.Insert(request) + logging.info("Response was %s", response) + return response.jobReference + + @retry.with_exponential_backoff( + num_retries=MAX_RETRIES, + retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def _insert_load_job(self, + project_id, + job_id, + table_reference, + source_uris, + schema=None, + write_disposition=None, + create_disposition=None): reference = bigquery.JobReference(jobId=job_id, projectId=project_id) request = bigquery.BigqueryJobsInsertRequest( - projectId=table_reference.project_id, + projectId=project_id, job=bigquery.Job( configuration=bigquery.JobConfiguration( load=bigquery.JobConfigurationLoad( - source_uris=source_uris, - destination_table=table_reference, + sourceUris=source_uris, + destinationTable=table_reference, + schema=schema, + writeDisposition=write_disposition, + createDisposition=create_disposition, + sourceFormat='NEWLINE_DELIMITED_JSON', + autodetect=schema is None, ) ), jobReference=reference, ) ) - response = self.client.jobs.Insert(request) - return response.jobReference.jobId + return response.jobReference @retry.with_exponential_backoff( num_retries=MAX_RETRIES, @@ -476,6 +523,35 @@ class BigQueryWrapper(object): @retry.with_exponential_backoff( num_retries=MAX_RETRIES, retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def get_job(self, project, job_id, location=None): + request = bigquery.BigqueryJobsGetRequest() + request.jobId = job_id + request.projectId = project + request.location = location + + return self.client.jobs.Get(request) + + def perform_load_job(self, + destination, + files, + job_id, + schema=None, + write_disposition=None, + create_disposition=None): + """Starts a job to load data into BigQuery. + + Returns: + bigquery.JobReference with the information about the job that was started. + """ + return self._insert_load_job( + destination.projectId, job_id, destination, files, + schema=schema, + create_disposition=create_disposition, + write_disposition=write_disposition) + + @retry.with_exponential_backoff( + num_retries=MAX_RETRIES, + retry_filter=retry.retry_on_server_errors_and_timeout_filter) def get_or_create_table( self, project_id, dataset_id, table_id, schema, create_disposition, write_disposition): diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py index c3069a3..5d17e89 100644 --- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py +++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py @@ -47,7 +47,7 @@ def retry_on_http_and_value_error(exception): class BigqueryMatcher(BaseMatcher): - """Matcher that verifies Bigquery data with given query. + """Matcher that verifies the checksum of Bigquery data with given query. Fetch Bigquery data with given query, compute a hash string and compare with expected checksum. @@ -106,3 +106,59 @@ class BigqueryMatcher(BaseMatcher): mismatch_description \ .append_text("Actual checksum is ") \ .append_text(self.checksum) + + +class BigqueryFullResultMatcher(BaseMatcher): + """Matcher that verifies Bigquery data with given query. + + Fetch Bigquery data with given query, compare to the expected data. + """ + + def __init__(self, project, query, data): + """Initialize BigQueryMatcher object. + Args: + project: The name (string) of the project. + query: The query (string) to perform. + data: List of tuples with the expected data. + """ + if bigquery is None: + raise ImportError( + 'Bigquery dependencies are not installed.') + if not query or not isinstance(query, str): + raise ValueError( + 'Invalid argument: query. Please use non-empty string') + + self.project = project + self.query = query + self.expected_data = [sorted(i) for i in data] + + def _matches(self, _): + logging.info('Start verify Bigquery data.') + # Run query + bigquery_client = bigquery.Client(project=self.project) + response = self._query_with_retry(bigquery_client) + logging.info('Read from given query (%s), total rows %d', + self.query, len(response)) + + self.actual_data = [sorted(i) for i in response] + + # Verify result + return sorted(self.expected_data) == sorted(self.actual_data) + + @retry.with_exponential_backoff( + num_retries=MAX_RETRIES, + retry_filter=retry_on_http_and_value_error) + def _query_with_retry(self, bigquery_client): + """Run Bigquery query with retry if got error http response""" + query_job = bigquery_client.query(self.query) + return [row.values() for row in query_job] + + def describe_to(self, description): + description \ + .append_text("Expected data is ") \ + .append_text(self.expected_data) + + def describe_mismatch(self, pipeline_result, mismatch_description): + mismatch_description \ + .append_text("Actual data is ") \ + .append_text(self.actual_data) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index f281d75..957c903 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -590,27 +590,6 @@ class DataflowRunner(PipelineRunner): PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - def apply_WriteToBigQuery(self, transform, pcoll, options): - # Make sure this is the WriteToBigQuery class that we expected - if not isinstance(transform, beam.io.WriteToBigQuery): - return self.apply_PTransform(transform, pcoll, options) - standard_options = options.view_as(StandardOptions) - if standard_options.streaming: - if (transform.write_disposition == - beam.io.BigQueryDisposition.WRITE_TRUNCATE): - raise RuntimeError('Can not use write truncation mode in streaming') - return self.apply_PTransform(transform, pcoll, options) - else: - return pcoll | 'WriteToBigQuery' >> beam.io.Write( - beam.io.BigQuerySink( - transform.table_reference.tableId, - transform.table_reference.datasetId, - transform.table_reference.projectId, - transform.schema, - transform.create_disposition, - transform.write_disposition, - kms_key=transform.kms_key)) - def apply_GroupByKey(self, transform, pcoll, options): # Infer coder of parent. # diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index ba01861..9eed963 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -684,7 +684,11 @@ class PTransformWithSideInputs(PTransform): self._cached_fn = self.fn # Ensure fn and side inputs are picklable for remote execution. - self.fn = pickler.loads(pickler.dumps(self.fn)) + try: + self.fn = pickler.loads(pickler.dumps(self.fn)) + except RuntimeError: + raise RuntimeError('Unable to pickle fn %s' % self.fn) + self.args = pickler.loads(pickler.dumps(self.args)) self.kwargs = pickler.loads(pickler.dumps(self.kwargs))