This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cdea885  [BEAM-6553] A Python SDK sink that supports File Loads into 
BQ (#7655)
cdea885 is described below

commit cdea885872b3be7de9ba22f22700be89f7d53766
Author: Pablo <pabl...@users.noreply.github.com>
AuthorDate: Sat Feb 16 18:58:36 2019 -0800

    [BEAM-6553] A Python SDK sink that supports File Loads into BQ (#7655)
    
    * First sketch for BQ File Loads in Python SDK
    
    * Sketching multiple destinations functionality
    
    * BigQuery file loads wired up to WriteToBigQuery
    
    * Creating temporary tables for atomicity on multiple load jobs
    Improving todos, documentation
    
    * Improving logging
    
    * Fixing DirectRunner ITs
    
    * Deleting temporary tables and testng corner cases
    
    * Fixing lint issue
    
    * Removing one integration test. Addressing comments.
    
    * Rewindowing input
    
    * Managing the limit of files per job
---
 .../io/gcp/big_query_query_to_table_it_test.py     |   7 +
 .../io/gcp/big_query_query_to_table_pipeline.py    |   7 +-
 sdks/python/apache_beam/io/gcp/bigquery.py         |  97 +++-
 .../apache_beam/io/gcp/bigquery_file_loads.py      | 601 +++++++++++++++++++++
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 462 ++++++++++++++++
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   |  92 +++-
 .../apache_beam/io/gcp/tests/bigquery_matcher.py   |  58 +-
 .../runners/dataflow/dataflow_runner.py            |  21 -
 sdks/python/apache_beam/transforms/ptransform.py   |   6 +-
 9 files changed, 1296 insertions(+), 55 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py 
b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 43db185..8980261 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -128,8 +128,11 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         project=self.project,
         query=verify_query,
         checksum=expected_checksum)]
+
+    gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
     extra_opts = {'query': LEGACY_QUERY,
                   'output': self.output_table,
+                  'gs_location': gs_location,
                   'output_schema': DIALECT_OUTPUT_SCHEMA,
                   'use_standard_sql': False,
                   'on_success_matcher': all_of(*pipeline_verifiers)}
@@ -144,8 +147,10 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         project=self.project,
         query=verify_query,
         checksum=expected_checksum)]
+    gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
     extra_opts = {'query': STANDARD_QUERY,
                   'output': self.output_table,
+                  'gs_location': gs_location,
                   'output_schema': DIALECT_OUTPUT_SCHEMA,
                   'use_standard_sql': True,
                   'on_success_matcher': all_of(*pipeline_verifiers)}
@@ -186,9 +191,11 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         query=verify_query,
         checksum=expected_checksum)]
     self._setup_new_types_env()
+    gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
     extra_opts = {
         'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE),
         'output': self.output_table,
+        'gs_location': gs_location,
         'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
         'use_standard_sql': False,
         'on_success_matcher': all_of(*pipeline_verifiers)}
diff --git 
a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py 
b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
index 26b418a..b35f4e5 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
@@ -50,6 +50,9 @@ def run_bq_pipeline(argv=None):
                       help='Output BQ table to write results to.')
   parser.add_argument('--kms_key', default=None,
                       help='Use this Cloud KMS key with BigQuery.')
+  parser.add_argument('--gs_location',
+                      default=None,
+                      help='GCS bucket location to use to store files.')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   table_schema = parse_table_schema_from_json(known_args.output_schema)
@@ -62,12 +65,12 @@ def run_bq_pipeline(argv=None):
   (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
       query=known_args.query, use_standard_sql=known_args.use_standard_sql,
       kms_key=kms_key))
-   | 'write' >> beam.io.Write(beam.io.BigQuerySink(
+   | 'write' >> beam.io.WriteToBigQuery(
            known_args.output,
            schema=table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
-           kms_key=known_args.kms_key)))
+           gs_location=known_args.gs_location))
 
   result = p.run()
   result.wait_until_finish()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 8f3011b..b1c1f90 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -134,6 +134,7 @@ from apache_beam.internal.gcp.json_value import 
to_json_value
 from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms import DoFn
 from apache_beam.transforms import ParDo
@@ -449,6 +450,10 @@ bigquery_v2_messages.TableSchema` object.
         match the expected format.
     """
 
+    import warnings
+    warnings.warn("This class is deprecated and will be permanently removed "
+                  "in a future version of beam.")
+
     # Import here to avoid adding the dependency for local running scenarios.
     try:
       # pylint: disable=wrong-import-order, wrong-import-position
@@ -646,18 +651,32 @@ class BigQueryWriteFn(DoFn):
 
 class WriteToBigQuery(PTransform):
 
-  def __init__(self, table, dataset=None, project=None, schema=None,
+  def __init__(self,
+               table,
+               dataset=None,
+               project=None,
+               schema=None,
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=BigQueryDisposition.WRITE_APPEND,
-               batch_size=None, kms_key=None, test_client=None):
+               kms_key=None,
+               batch_size=None,
+               max_file_size=None,
+               max_files_per_bundle=None,
+               test_client=None,
+               gs_location=None):
     """Initialize a WriteToBigQuery transform.
 
     Args:
-      table (str): The ID of the table. The ID must contain only letters
-        ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If dataset
-        argument is :data:`None` then the table argument must contain the
-        entire table reference specified as: ``'DATASET.TABLE'`` or
-        ``'PROJECT:DATASET.TABLE'``.
+      table (str, callable): The ID of the table, or a callable
+         that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
+         numbers ``0-9``, or underscores ``_``. If dataset argument is
+         :data:`None` then the table argument must contain the entire table
+         reference specified as: ``'DATASET.TABLE'``
+         or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive 
one
+         argument representing an element to be written to BigQuery, and return
+         a TableReference, or a string table name as specified above.
+         Multiple destinations are only supported on Batch pipelines at the
+         moment.
       dataset (str): The ID of the dataset containing this table or
         :data:`None` if the table reference is specified entirely by the table
         argument.
@@ -691,12 +710,23 @@ bigquery_v2_messages.TableSchema`
           empty.
 
         For streaming pipelines WriteTruncate can not be used.
-
-      batch_size (int): Number of rows to be written to BQ per streaming API
-        insert.
       kms_key (str): Experimental. Optional Cloud KMS key name for use when
         creating new tables.
+      batch_size (int): Number of rows to be written to BQ per streaming API
+        insert. The default is 500.
+        insert.
       test_client: Override the default bigquery client used for testing.
+      max_file_size (int): The maximum size for a file to be written and then
+        loaded into BigQuery. The default value is 4TB, which is 80% of the
+        limit of 5TB for BigQuery to load any file.
+      max_files_per_bundle(int): The maximum number of files to be concurrently
+        written by a worker. The default here is 20. Larger values will allow
+        writing to multiple destinations without having to reshard - but they
+        increase the memory burden on the workers.
+      gs_location (str): A GCS location to store files to be used for file
+        loads into BigQuery. By default, this will use the pipeline's
+        temp_location, but for pipelines whose temp_location is not appropriate
+        for BQ File Loads, users should pass a specific one.
     """
     self.table_reference = bigquery_tools.parse_table_reference(
         table, dataset, project)
@@ -708,6 +738,9 @@ bigquery_v2_messages.TableSchema`
     self.batch_size = batch_size
     self.kms_key = kms_key
     self.test_client = test_client
+    self.gs_location = gs_location
+    self.max_file_size = max_file_size
+    self.max_files_per_bundle = max_files_per_bundle
 
   @staticmethod
   def get_table_schema_from_string(schema):
@@ -788,20 +821,40 @@ bigquery_v2_messages.TableSchema):
       raise TypeError('Unexpected schema argument: %s.' % schema)
 
   def expand(self, pcoll):
-    if self.table_reference.projectId is None:
+    p = pcoll.pipeline
+
+    # TODO(pabloem): Use a different method to determine if streaming or batch.
+    standard_options = p.options.view_as(StandardOptions)
+
+    if (not callable(self.table_reference)
+        and self.table_reference.projectId is None):
       self.table_reference.projectId = pcoll.pipeline.options.view_as(
           GoogleCloudOptions).project
-    bigquery_write_fn = BigQueryWriteFn(
-        table_id=self.table_reference.tableId,
-        dataset_id=self.table_reference.datasetId,
-        project_id=self.table_reference.projectId,
-        batch_size=self.batch_size,
-        schema=self.get_dict_table_schema(self.schema),
-        create_disposition=self.create_disposition,
-        write_disposition=self.write_disposition,
-        kms_key=self.kms_key,
-        test_client=self.test_client)
-    return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn)
+
+    if standard_options.streaming:
+      # TODO: Support load jobs for streaming pipelines.
+      bigquery_write_fn = BigQueryWriteFn(
+          table_id=self.table_reference.tableId,
+          dataset_id=self.table_reference.datasetId,
+          project_id=self.table_reference.projectId,
+          batch_size=self.batch_size,
+          schema=self.get_dict_table_schema(self.schema),
+          create_disposition=self.create_disposition,
+          write_disposition=self.write_disposition,
+          kms_key=self.kms_key,
+          test_client=self.test_client)
+      return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn)
+    else:
+      from apache_beam.io.gcp import bigquery_file_loads
+      return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
+          destination=self.table_reference,
+          schema=self.get_dict_table_schema(self.schema),
+          create_disposition=self.create_disposition,
+          write_disposition=self.write_disposition,
+          max_file_size=self.max_file_size,
+          max_files_per_bundle=self.max_files_per_bundle,
+          gs_location=self.gs_location,
+          test_client=self.test_client)
 
   def display_data(self):
     res = {}
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
new file mode 100644
index 0000000..96ba2ab
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -0,0 +1,601 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Functionality to perform file loads into BigQuery for Batch and Streaming
+pipelines.
+
+This source is able to work around BigQuery load quotas and limitations. When
+destinations are dynamic, or when data for a single job is too large, the data
+will be split into multiple jobs.
+
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from __future__ import absolute_import
+
+import datetime
+import hashlib
+import itertools
+import logging
+import random
+import time
+import uuid
+
+from future.utils import iteritems
+
+import apache_beam as beam
+from apache_beam import pvalue
+from apache_beam.io import filesystems as fs
+from apache_beam.io.gcp import bigquery_tools
+from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
+from apache_beam.options import value_provider as vp
+from apache_beam.transforms.combiners import Count
+
+ONE_TERABYTE = (1 << 40)
+
+# The maximum file size for imports is 5TB. We keep our files under that.
+_DEFAULT_MAX_FILE_SIZE = 4 * ONE_TERABYTE
+
+_DEFAULT_MAX_WRITERS_PER_BUNDLE = 20
+
+# The maximum size for a single load job is one terabyte
+_MAXIMUM_LOAD_SIZE = 15 * ONE_TERABYTE
+
+# Big query only supports up to 10 thousand URIs for a single load job.
+_MAXIMUM_SOURCE_URIS = 10*1000
+
+
+def _generate_load_job_name():
+  datetime_component = datetime.datetime.now().strftime("%Y_%m_%d_%H%M%S")
+  # TODO(pabloem): include job id / pipeline component?
+  return 'beam_load_%s_%s' % (datetime_component, random.randint(0, 100))
+
+
+def _generate_file_prefix(pipeline_gcs_location):
+  # If a gcs location is provided to the pipeline, then we shall use that.
+  # Otherwise, we shall use the temp_location from pipeline options.
+  gcs_base = str(pipeline_gcs_location or
+                 vp.RuntimeValueProvider.get_value('temp_location', str, ''))
+  prefix_uuid = _bq_uuid()
+  return fs.FileSystems.join(gcs_base, 'bq_load', prefix_uuid)
+
+
+def _make_new_file_writer(file_prefix, destination):
+  if isinstance(destination, bigquery_api.TableReference):
+    destination = '%s:%s.%s' % (
+        destination.projectId, destination.datasetId, destination.tableId)
+
+  directory = fs.FileSystems.join(file_prefix, destination)
+
+  if not fs.FileSystems.exists(directory):
+    fs.FileSystems.mkdirs(directory)
+
+  file_name = str(uuid.uuid4())
+  file_path = fs.FileSystems.join(file_prefix, destination, file_name)
+
+  return file_path, fs.FileSystems.create(file_path, 'application/text')
+
+
+def _bq_uuid(seed=None):
+  if not seed:
+    return str(uuid.uuid4()).replace("-", "")
+  else:
+    return str(hashlib.md5(seed).hexdigest())
+
+
+class _AppendDestinationsFn(beam.DoFn):
+  """Adds the destination to an element, making it a KV pair.
+
+  Outputs a PCollection of KV-pairs where the key is a TableReference for the
+  destination, and the value is the record itself.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+
+  def __init__(self, destination):
+    if callable(destination):
+      self.destination = destination
+    else:
+      self.destination = lambda x: destination
+
+  def process(self, element):
+    yield (self.destination(element), element)
+
+
+class _ShardDestinations(beam.DoFn):
+  """Adds a shard number to the key of the KV element.
+
+  Experimental; no backwards compatibility guarantees."""
+  DEFAULT_SHARDING_FACTOR = 10
+
+  def __init__(self, sharding_factor=DEFAULT_SHARDING_FACTOR):
+    self.sharding_factor = sharding_factor
+
+  def start_bundle(self):
+    self._shard_count = random.randrange(self.sharding_factor)
+
+  def process(self, element):
+    destination = element[0]
+    row = element[1]
+
+    sharded_destination = (destination,
+                           self._shard_count % self.sharding_factor)
+    self._shard_count += 1
+    yield (sharded_destination, row)
+
+
+class WriteRecordsToFile(beam.DoFn):
+  """Write input records to files before triggering a load job.
+
+  This transform keeps up to ``max_files_per_bundle`` files open to write to. 
It
+  receives (destination, record) tuples, and it writes the records to different
+  files for each destination.
+
+  If there are more than ``max_files_per_bundle`` destinations that we need to
+  write to, then those records are grouped by their destination, and later
+  written to files by ``WriteGroupedRecordsToFile``.
+
+  It outputs two PCollections.
+  """
+
+  UNWRITTEN_RECORD_TAG = 'UnwrittenRecords'
+  WRITTEN_FILE_TAG = 'WrittenFiles'
+
+  def __init__(self,
+               max_files_per_bundle=_DEFAULT_MAX_WRITERS_PER_BUNDLE,
+               max_file_size=_DEFAULT_MAX_FILE_SIZE,
+               coder=None):
+    """Initialize a :class:`WriteRecordsToFile`.
+
+    Args:
+      max_files_per_bundle (int): The maximum number of files that can be kept
+        open during execution of this step in a worker. This is to avoid over-
+        whelming the worker memory.
+      max_file_size (int): The maximum size in bytes for a file to be used in
+        an export job.
+
+    """
+    self.max_files_per_bundle = max_files_per_bundle
+    self.max_file_size = max_file_size
+    self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+
+  def display_data(self):
+    return {
+        'max_files_per_bundle': self.max_files_per_bundle,
+        'max_file_size': str(self.max_file_size),
+        'coder': self.coder.__class__.__name__
+    }
+
+  def start_bundle(self):
+    self._destination_to_file_writer = {}
+
+  def process(self, element, file_prefix):
+    destination = element[0]
+    row = element[1]
+
+    if destination in self._destination_to_file_writer:
+      writer = self._destination_to_file_writer[destination]
+    elif len(self._destination_to_file_writer) < self.max_files_per_bundle:
+      (file_path, writer) = _make_new_file_writer(file_prefix, destination)
+      self._destination_to_file_writer[destination] = writer
+      yield pvalue.TaggedOutput(WriteRecordsToFile.WRITTEN_FILE_TAG,
+                                (destination, file_path))
+    else:
+      yield pvalue.TaggedOutput(
+          WriteRecordsToFile.UNWRITTEN_RECORD_TAG, element)
+      return
+
+    # TODO(pabloem): Is it possible for this to throw exception?
+    writer.write(self.coder.encode(row))
+    writer.write(b'\n')
+
+    if writer.tell() > self.max_file_size:
+      writer.close()
+      self._destination_to_file_writer.pop(destination)
+
+  def finish_bundle(self):
+    for _, writer in iteritems(self._destination_to_file_writer):
+      writer.close()
+    self._destination_to_file_writer = {}
+
+
+class WriteGroupedRecordsToFile(beam.DoFn):
+  """Receives collection of dest-iterable(records), writes it to files.
+
+  This is different from ``WriteRecordsToFile`` because it receives records
+  grouped by destination. This means that it's not necessary to keep multiple
+  file descriptors open, because we know for sure when records for a single
+  destination have been written out.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+
+  def __init__(self, max_file_size=_DEFAULT_MAX_FILE_SIZE,
+               coder=None):
+    self.max_file_size = max_file_size
+    self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+
+  def process(self, element, file_prefix):
+    destination = element[0]
+    rows = element[1]
+
+    writer = None
+
+    for row in rows:
+      if writer is None:
+        (file_path, writer) = _make_new_file_writer(file_prefix, destination)
+        yield (destination, file_path)
+
+      writer.write(self.coder.encode(row))
+      writer.write(b'\n')
+
+      if writer.tell() > self.max_file_size:
+        writer.close()
+        writer = None
+
+
+class TriggerCopyJobs(beam.DoFn):
+  """Launches jobs to copy from temporary tables into the main target table.
+
+  When a job needs to write to multiple destination tables, or when a single
+  destination table needs to have multiple load jobs to write to it, files are
+  loaded into temporary tables, and those tables are later copied to the
+  destination tables.
+
+  This transform emits (destination, job_reference) pairs.
+  """
+  def __init__(self,
+               create_disposition=None,
+               write_disposition=None,
+               test_client=None,
+               temporary_tables=False):
+    self.create_disposition = create_disposition
+    self.write_disposition = write_disposition
+    self.test_client = test_client
+    self.temporary_tables = temporary_tables
+
+  def start_bundle(self):
+    self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
+
+  def process(self, element, job_name_prefix=None):
+    destination = element[0]
+    job_reference = element[1]
+
+    if not self.temporary_tables:
+      # If we did not use temporary tables, then we do not need to trigger any
+      # copy jobs.
+      return
+
+    copy_to_reference = bigquery_tools.parse_table_reference(destination)
+    if copy_to_reference.projectId is None:
+      copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
+          'project', str, '')
+
+    copy_from_reference = bigquery_tools.parse_table_reference(destination)
+    copy_from_reference.tableId = job_reference.jobId
+    if copy_from_reference.projectId is None:
+      copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
+          'project', str, '')
+
+    copy_job_name = '%s_copy_%s_to_%s' % (
+        job_name_prefix,
+        _bq_uuid('%s:%s.%s' % (copy_from_reference.projectId,
+                               copy_from_reference.datasetId,
+                               copy_from_reference.tableId)),
+        _bq_uuid('%s:%s.%s' % (copy_to_reference.projectId,
+                               copy_to_reference.datasetId,
+                               copy_to_reference.tableId)))
+
+    logging.info("Triggering copy job from %s to %s",
+                 copy_from_reference, copy_to_reference)
+    job_reference = self.bq_wrapper._insert_copy_job(
+        copy_to_reference.projectId,
+        copy_job_name,
+        copy_from_reference,
+        copy_to_reference,
+        create_disposition=self.create_disposition,
+        write_disposition=self.write_disposition)
+
+    yield (destination, job_reference)
+
+
+class TriggerLoadJobs(beam.DoFn):
+  """Triggers the import jobs to BQ.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+
+  TEMP_TABLES = 'TemporaryTables'
+
+  def __init__(self,
+               schema=None,
+               create_disposition=None,
+               write_disposition=None,
+               test_client=None,
+               temporary_tables=False):
+    self.schema = schema
+    self.test_client = test_client
+    self.temporary_tables = temporary_tables
+    if self.temporary_tables:
+      # If we are loading into temporary tables, we rely on the default create
+      # and write dispositions, which mean that a new table will be created.
+      self.create_disposition = None
+      self.write_disposition = None
+    else:
+      self.create_disposition = create_disposition
+      self.write_disposition = write_disposition
+
+  def display_data(self):
+    result = {'create_disposition': str(self.create_disposition),
+              'write_disposition': str(self.write_disposition)}
+    if self.schema is not None:
+      result['schema'] = str(self.schema)
+    else:
+      result['schema'] = 'AUTODETECT'
+
+    return result
+
+  def start_bundle(self):
+    self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
+
+  def process(self, element, load_job_name_prefix):
+    destination = element[0]
+    files = iter(element[1])
+
+    job_count = 0
+    batch_of_files = list(itertools.islice(files, _MAXIMUM_SOURCE_URIS))
+    while batch_of_files:
+
+      table_reference = bigquery_tools.parse_table_reference(destination)
+      if table_reference.projectId is None:
+        table_reference.projectId = vp.RuntimeValueProvider.get_value(
+            'project', str, '')
+
+      # Load jobs for a single des5tination are always triggered from the same
+      # worker. This means that we can generate a deterministic numbered job 
id,
+      # and not need to worry.
+      job_name = '%s_%s_%s' % (
+          load_job_name_prefix,
+          _bq_uuid('%s:%s.%s' % (table_reference.projectId,
+                                 table_reference.datasetId,
+                                 table_reference.tableId)),
+          job_count)
+      logging.debug("Batch of files has %s files. Job name is %s",
+                    len(batch_of_files), job_name)
+
+      if self.temporary_tables:
+        # For temporary tables, we create a new table with the name with JobId.
+        table_reference.tableId = job_name
+        yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference)
+
+      logging.info("Triggering job %s to load data to BigQuery table %s.",
+                   job_name, table_reference)
+      job_reference = self.bq_wrapper.perform_load_job(
+          table_reference, batch_of_files, job_name,
+          schema=self.schema,
+          write_disposition=self.write_disposition,
+          create_disposition=self.create_disposition)
+      yield (destination, job_reference)
+
+      # Prepare to trigger the next job
+      job_count += 1
+      batch_of_files = list(itertools.islice(files, _MAXIMUM_SOURCE_URIS))
+
+
+class WaitForBQJobs(beam.DoFn):
+  """Takes in a series of BQ job names as side input, and waits for all of 
them.
+
+  If any job fails, it will fail. If all jobs succeed, it will succeed.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+  ALL_DONE = object()
+  FAILED = object()
+  WAITING = object()
+
+  def __init__(self, test_client):
+    self.test_client = test_client
+
+  def start_bundle(self):
+    self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
+
+  def process(self, element, dest_ids_list):
+    job_references = [elm[1] for elm in dest_ids_list]
+
+    while True:
+      status = self._check_job_states(job_references)
+      if status == WaitForBQJobs.FAILED:
+        raise Exception(
+            'BigQuery jobs failed. BQ error: %s', self._latest_error)
+      elif status == WaitForBQJobs.ALL_DONE:
+        return dest_ids_list  # Pass the list of destination-jobs downstream
+      time.sleep(10)
+
+  def _check_job_states(self, job_references):
+    for ref in job_references:
+      job = self.bq_wrapper.get_job(ref.projectId,
+                                    ref.jobId,
+                                    ref.location)
+
+      logging.info("Job status: %s", job.status)
+      if job.status.state == 'DONE' and job.status.errorResult:
+        logging.warn("Job %s seems to have failed. Error Result: %s",
+                     ref.jobId, job.status.errorResult)
+        self._latest_error = job.status
+        return WaitForBQJobs.FAILED
+      elif job.status.state == 'DONE':
+        continue
+
+    return WaitForBQJobs.ALL_DONE
+
+
+class DeleteTablesFn(beam.DoFn):
+  def __init__(self, test_client=None):
+    self.test_client = test_client
+
+  def start_bundle(self):
+    self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
+
+  def process(self, table_reference):
+    logging.info("Deleting table %s", table_reference)
+    table_reference = bigquery_tools.parse_table_reference(table_reference)
+    self.bq_wrapper._delete_table(
+        table_reference.projectId,
+        table_reference.datasetId,
+        table_reference.tableId)
+
+
+class BigQueryBatchFileLoads(beam.PTransform):
+  """Takes in a set of elements, and inserts them to BigQuery via batch loads.
+
+  """
+
+  DESTINATION_JOBID_PAIRS = 'destination_load_jobid_pairs'
+  DESTINATION_FILE_PAIRS = 'destination_file_pairs'
+  DESTINATION_COPY_JOBID_PAIRS = 'destination_copy_jobid_pairs'
+
+  def __init__(
+      self,
+      destination,
+      schema=None,
+      gs_location=None,
+      create_disposition=None,
+      write_disposition=None,
+      coder=None,
+      max_file_size=None,
+      max_files_per_bundle=None,
+      test_client=None):
+    self.destination = destination
+    self.create_disposition = create_disposition
+    self.write_disposition = write_disposition
+    self.max_file_size = max_file_size or _DEFAULT_MAX_FILE_SIZE
+    self.max_files_per_bundle = (max_files_per_bundle or
+                                 _DEFAULT_MAX_WRITERS_PER_BUNDLE)
+    self._input_gs_location = gs_location
+    self.test_client = test_client
+    self.schema = schema
+    self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+
+    # If we have multiple destinations, then we will have multiple load jobs,
+    # thus we will need temporary tables for atomicity.
+    # If the destination is a single one, we assume that we will have only one
+    # job to run - and thus we avoid using temporary tables
+    self.temp_tables = True if callable(destination) else False
+
+  def expand(self, pcoll):
+    p = pcoll.pipeline
+
+    load_job_name_pcv = pvalue.AsSingleton(
+        p
+        | "ImpulseJobName" >> beam.Create([None])
+        | beam.Map(lambda _: _generate_load_job_name()))
+
+    file_prefix_pcv = pvalue.AsSingleton(
+        p
+        | "CreateFilePrefixView" >> beam.Create([self._input_gs_location])
+        | "GenerateFilePrefix" >> beam.Map(_generate_file_prefix))
+
+    outputs = (
+        pcoll
+        | "ApplyGlobalWindow" >> beam.WindowInto(beam.window.GlobalWindows())
+        | "AppendDestination" >> beam.ParDo(_AppendDestinationsFn(
+            self.destination))
+        | beam.ParDo(
+            WriteRecordsToFile(max_files_per_bundle=self.max_files_per_bundle,
+                               max_file_size=self.max_file_size,
+                               coder=self.coder),
+            file_prefix=file_prefix_pcv).with_outputs(
+                WriteRecordsToFile.UNWRITTEN_RECORD_TAG,
+                WriteRecordsToFile.WRITTEN_FILE_TAG))
+
+    # A PCollection of (destination, file) tuples. It lists files with records,
+    # and the destination each file is meant to be imported into.
+    destination_files_kv_pc = outputs[WriteRecordsToFile.WRITTEN_FILE_TAG]
+
+    # A PCollection of (destination, record) tuples. These are later sharded,
+    # grouped, and all records for each destination-shard is written to files.
+    # This PCollection is necessary because not all records can be written into
+    # files in ``WriteRecordsToFile``.
+    unwritten_records_pc = outputs[WriteRecordsToFile.UNWRITTEN_RECORD_TAG]
+
+    more_destination_files_kv_pc = (
+        unwritten_records_pc
+        | beam.ParDo(_ShardDestinations())
+        | "GroupShardedRows" >> beam.GroupByKey()
+        | "DropShardNumber" >> beam.Map(lambda x: (x[0][0], x[1]))
+        | "WriteGroupedRecordsToFile" >> beam.ParDo(WriteGroupedRecordsToFile(
+            coder=self.coder), file_prefix=file_prefix_pcv)
+    )
+
+    all_destination_file_pairs_pc = (
+        (destination_files_kv_pc, more_destination_files_kv_pc)
+        | "DestinationFilesUnion" >> beam.Flatten())
+
+    grouped_files_pc = (
+        all_destination_file_pairs_pc
+        | "GroupFilesByTableDestinations" >> beam.GroupByKey())
+
+    # Load Jobs are triggered to temporary tables, and those are later copied 
to
+    # the actual appropriate destination query. This ensures atomicity when 
only
+    # some of the load jobs would fail but not other.
+    # If any of them fails, then copy jobs are not triggered.
+    trigger_loads_outputs = (
+        grouped_files_pc | beam.ParDo(TriggerLoadJobs(
+            schema=self.schema,
+            write_disposition=self.write_disposition,
+            create_disposition=self.create_disposition,
+            test_client=self.test_client,
+            temporary_tables=self.temp_tables), 
load_job_name_pcv).with_outputs(
+                TriggerLoadJobs.TEMP_TABLES, main='main')
+    )
+
+    destination_job_ids_pc = trigger_loads_outputs['main']
+    temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES]
+
+    destination_copy_job_ids_pc = (
+        p
+        | "ImpulseMonitorLoadJobs" >> beam.Create([None])
+        | "WaitForLoadJobs" >> beam.ParDo(
+            WaitForBQJobs(self.test_client),
+            beam.pvalue.AsList(destination_job_ids_pc))
+        | beam.ParDo(TriggerCopyJobs(
+            create_disposition=self.create_disposition,
+            write_disposition=self.write_disposition,
+            temporary_tables=self.temp_tables,
+            test_client=self.test_client), load_job_name_pcv))
+
+    finished_copy_jobs_pc = (p
+                             | "ImpulseMonitorCopyJobs" >> beam.Create([None])
+                             | "WaitForCopyJobs" >> beam.ParDo(
+                                 WaitForBQJobs(self.test_client),
+                                 
beam.pvalue.AsList(destination_copy_job_ids_pc)
+                             ))
+
+    _ = (finished_copy_jobs_pc
+         | "RemoveTempTables/PassTables" >> beam.FlatMap(
+             lambda x, deleting_tables: deleting_tables,
+             pvalue.AsIter(temp_tables_pc))
+         | "RemoveTempTables/DeduplicateTables" >> Count.PerElement()
+         | "RemoveTempTables/GetTableNames" >> beam.Map(lambda elm: elm[0])
+         | "RemoveTempTables/Delete" >> beam.ParDo(DeleteTablesFn()))
+
+    return {
+        self.DESTINATION_JOBID_PAIRS: destination_job_ids_pc,
+        self.DESTINATION_FILE_PAIRS: all_destination_file_pairs_pc,
+        self.DESTINATION_COPY_JOBID_PAIRS: destination_copy_job_ids_pc,
+    }
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
new file mode 100644
index 0000000..6a1166d
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -0,0 +1,462 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for BigQuery file loads utilities."""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import random
+import time
+import unittest
+
+import mock
+from hamcrest.core import assert_that as hamcrest_assert
+from hamcrest.core.core.allof import all_of
+from hamcrest.core.core.is_ import is_
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
+from apache_beam.io.gcp import bigquery_file_loads as bqfl
+from apache_beam.io.gcp import bigquery
+from apache_beam.io.gcp import bigquery_tools
+from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+
+
+_DESTINATION_ELEMENT_PAIRS = [
+    # DESTINATION 1
+    ('project1:dataset1.table1', '{"name":"beam", "language":"py"}'),
+    ('project1:dataset1.table1', '{"name":"beam", "language":"java"}'),
+    ('project1:dataset1.table1', '{"name":"beam", "language":"go"}'),
+    ('project1:dataset1.table1', '{"name":"flink", "language":"java"}'),
+    ('project1:dataset1.table1', '{"name":"flink", "language":"scala"}'),
+
+    # DESTINATION 3
+    ('project1:dataset1.table3', '{"name":"spark", "language":"scala"}'),
+
+    # DESTINATION 1
+    ('project1:dataset1.table1', '{"name":"spark", "language":"py"}'),
+    ('project1:dataset1.table1', '{"name":"spark", "language":"scala"}'),
+
+    # DESTINATION 2
+    ('project1:dataset1.table2', '{"name":"beam", "foundation":"apache"}'),
+    ('project1:dataset1.table2', '{"name":"flink", "foundation":"apache"}'),
+    ('project1:dataset1.table2', '{"name":"spark", "foundation":"apache"}'),
+]
+
+_NAME_LANGUAGE_ELEMENTS = [
+    json.loads(elm[1])
+    for elm in _DESTINATION_ELEMENT_PAIRS if "language" in elm[1]
+]
+
+
+_DISTINCT_DESTINATIONS = list(
+    set([elm[0] for elm in _DESTINATION_ELEMENT_PAIRS]))
+
+
+_ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS])
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
+  maxDiff = None
+
+  def _consume_input(self, fn, checks=None):
+    if checks is None:
+      return
+
+    with TestPipeline() as p:
+      output_pcs = (
+          p
+          | beam.Create(_DESTINATION_ELEMENT_PAIRS)
+          | beam.ParDo(fn, self.tmpdir)
+          .with_outputs(fn.WRITTEN_FILE_TAG, fn.UNWRITTEN_RECORD_TAG))
+
+      checks(output_pcs)
+      return output_pcs
+
+  def test_files_created(self):
+    """Test that the files are created and written."""
+
+    fn = bqfl.WriteRecordsToFile()
+    self.tmpdir = self._new_tempdir()
+
+    def check_files_created(output_pcs):
+      dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG]
+
+      files = dest_file_pc | "GetFiles" >> beam.Map(lambda x: x[1])
+      file_count = files | "CountFiles" >> beam.combiners.Count.Globally()
+
+      _ = files | "FilesExist" >> beam.Map(
+          lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
+      assert_that(file_count, equal_to([3]), label='check file count')
+
+      destinations = dest_file_pc | "GetDests" >> beam.Map(lambda x: x[0])
+      assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
+                  label='check destinations ')
+
+    self._consume_input(fn, check_files_created)
+
+  def test_many_files(self):
+    """Forces records to be written to many files.
+
+    For each destination multiple files are necessary. This is because the max
+    file length is very small, so only a couple records fit in each file.
+    """
+
+    fn = bqfl.WriteRecordsToFile(max_file_size=50)
+    self.tmpdir = self._new_tempdir()
+
+    def check_many_files(output_pcs):
+      dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG]
+
+      files_per_dest = (dest_file_pc
+                        | beam.Map(lambda x: x).with_output_types(
+                            beam.typehints.KV[str, str])
+                        | beam.combiners.Count.PerKey())
+      assert_that(files_per_dest,
+                  equal_to([('project1:dataset1.table1', 4),
+                            ('project1:dataset1.table2', 2),
+                            ('project1:dataset1.table3', 1)]))
+
+      # Check that the files exist
+      _ = dest_file_pc | beam.Map(lambda x: x[1]) | beam.Map(
+          lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
+
+    self._consume_input(fn, check_many_files)
+
+  def test_records_are_spilled(self):
+    """Forces records to be written to many files.
+
+    For each destination multiple files are necessary, and at most two files 
can
+    be created. This forces records to be spilled to the next stage of
+    processing.
+    """
+
+    fn = bqfl.WriteRecordsToFile(max_files_per_bundle=2)
+    self.tmpdir = self._new_tempdir()
+
+    def check_many_files(output_pcs):
+      dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG]
+      spilled_records_pc = output_pcs[
+          bqfl.WriteRecordsToFile.UNWRITTEN_RECORD_TAG]
+
+      spilled_records_count = (spilled_records_pc |
+                               beam.combiners.Count.Globally())
+      assert_that(spilled_records_count, equal_to([3]), label='spilled count')
+
+      files_per_dest = (dest_file_pc
+                        | beam.Map(lambda x: x).with_output_types(
+                            beam.typehints.KV[str, str])
+                        | beam.combiners.Count.PerKey())
+
+      # Only table1 and table3 get files. table2 records get spilled.
+      assert_that(files_per_dest,
+                  equal_to([('project1:dataset1.table1', 1),
+                            ('project1:dataset1.table3', 1)]),
+                  label='file count')
+
+      # Check that the files exist
+      _ = dest_file_pc | beam.Map(lambda x: x[1]) | beam.Map(
+          lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
+
+    self._consume_input(fn, check_many_files)
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
+
+  def _consume_input(self, fn, input, checks):
+    if checks is None:
+      return
+
+    with TestPipeline() as p:
+      res = (p
+             | beam.Create(input)
+             | beam.GroupByKey()
+             | beam.ParDo(fn, self.tmpdir))
+
+      checks(res)
+      return res
+
+  def test_files_are_created(self):
+    """Test that the files are created and written."""
+
+    fn = bqfl.WriteGroupedRecordsToFile()
+    self.tmpdir = self._new_tempdir()
+
+    def check_files_created(output_pc):
+      files = output_pc | "GetFiles" >> beam.Map(lambda x: x[1])
+      file_count = files | "CountFiles" >> beam.combiners.Count.Globally()
+
+      _ = files | "FilesExist" >> beam.Map(
+          lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
+      assert_that(file_count, equal_to([3]), label='check file count')
+
+      destinations = output_pc | "GetDests" >> beam.Map(lambda x: x[0])
+      assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
+                  label='check destinations ')
+
+    self._consume_input(
+        fn, _DESTINATION_ELEMENT_PAIRS, check_files_created)
+
+  def test_multiple_files(self):
+    """Forces records to be written to many files.
+
+    For each destination multiple files are necessary. This is because the max
+    file length is very small, so only a couple records fit in each file.
+    """
+    fn = bqfl.WriteGroupedRecordsToFile(max_file_size=50)
+    self.tmpdir = self._new_tempdir()
+
+    def check_multiple_files(output_pc):
+      files_per_dest = output_pc | beam.combiners.Count.PerKey()
+      assert_that(files_per_dest,
+                  equal_to([('project1:dataset1.table1', 4),
+                            ('project1:dataset1.table2', 2),
+                            ('project1:dataset1.table3', 1), ]))
+
+      # Check that the files exist
+      _ = output_pc | beam.Map(lambda x: x[1]) | beam.Map(os.path.exists)
+
+    self._consume_input(fn, _DESTINATION_ELEMENT_PAIRS, check_multiple_files)
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
+
+  def test_records_traverse_transform_with_mocks(self):
+    destination = 'project1:dataset1.table1'
+
+    job_reference = bigquery_api.JobReference()
+    job_reference.projectId = 'project1'
+    job_reference.jobId = 'job_name1'
+    result_job = bigquery_api.Job()
+    result_job.jobReference = job_reference
+
+    mock_job = mock.Mock()
+    mock_job.status.state = 'DONE'
+    mock_job.status.errorResult = None
+    mock_job.jobReference = job_reference
+
+    bq_client = mock.Mock()
+    bq_client.jobs.Get.return_value = mock_job
+
+    bq_client.jobs.Insert.return_value = result_job
+
+    transform = bigquery.WriteToBigQuery(
+        destination,
+        gs_location=self._new_tempdir(),
+        test_client=bq_client)
+
+    # Need to test this with the DirectRunner to avoid serializing mocks
+    with TestPipeline('DirectRunner') as p:
+      outputs = p | beam.Create(_ELEMENTS) | transform
+
+      dest_files = outputs[bqfl.BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS]
+      dest_job = outputs[bqfl.BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS]
+
+      jobs = dest_job | "GetJobs" >> beam.Map(lambda x: x[1])
+
+      files = dest_files | "GetFiles" >> beam.Map(lambda x: x[1])
+      destinations = (dest_files
+                      | "GetUniques" >> beam.combiners.Count.PerKey()
+                      | "GetDests" >> beam.Map(lambda x: x[0]))
+
+      # All files exist
+      _ = (files | beam.Map(
+          lambda x: hamcrest_assert(os.path.exists(x), is_(True))))
+
+      # One file per destination
+      assert_that(files | beam.combiners.Count.Globally(),
+                  equal_to([1]),
+                  label='CountFiles')
+
+      assert_that(destinations,
+                  
equal_to([bigquery_tools.parse_table_reference(destination)]),
+                  label='CheckDestinations')
+
+      assert_that(jobs,
+                  equal_to([job_reference]), label='CheckJobs')
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class BigQueryFileLoadsIT(unittest.TestCase):
+
+  BIG_QUERY_DATASET_ID = 'python_bq_file_loads_'
+  BIG_QUERY_SCHEMA = (
+      '{"fields": [{"name": "name","type": "STRING"},'
+      '{"name": "language","type": "STRING"}]}'
+  )
+
+  BIG_QUERY_SCHEMA_2 = (
+      '{"fields": [{"name": "name","type": "STRING"},'
+      '{"name": "foundation","type": "STRING"}]}'
+  )
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.runner_name = type(self.test_pipeline.runner).__name__
+    self.project = self.test_pipeline.get_option('project')
+
+    self.dataset_id = '%s%s%d' % (self.BIG_QUERY_DATASET_ID,
+                                  str(int(time.time())),
+                                  random.randint(0, 10000))
+    self.bigquery_client = bigquery_tools.BigQueryWrapper()
+    self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+    self.output_table = "%s.output_table" % (self.dataset_id)
+    logging.info("Created dataset %s in project %s",
+                 self.dataset_id, self.project)
+
+  @attr('IT')
+  def test_multiple_destinations_transform(self):
+    output_table_1 = '%s%s' % (self.output_table, 1)
+    output_table_2 = '%s%s' % (self.output_table, 2)
+    output_table_3 = '%s%s' % (self.output_table, 3)
+    output_table_4 = '%s%s' % (self.output_table, 4)
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_1,
+            data=[(d['name'], d['language'])
+                  for d in _ELEMENTS
+                  if 'language' in d]),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_2,
+            data=[(d['name'], d['foundation'])
+                  for d in _ELEMENTS
+                  if 'foundation' in d]),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_3,
+            data=[(d['name'], d['language'])
+                  for d in _ELEMENTS
+                  if 'language' in d]),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_4,
+            data=[(d['name'], d['foundation'])
+                  for d in _ELEMENTS
+                  if 'foundation' in d])]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=all_of(*pipeline_verifiers))
+
+    with beam.Pipeline(argv=args) as p:
+      input = p | beam.Create(_ELEMENTS)
+
+      # Get all input in same machine
+      input = (input
+               | beam.Map(lambda x: (None, x))
+               | beam.GroupByKey()
+               | beam.FlatMap(lambda elm: elm[1]))
+
+      _ = (input |
+           "WriteWithMultipleDestsFreely" >> bigquery.WriteToBigQuery(
+               table=lambda x: (output_table_1
+                                if 'language' in x
+                                else output_table_2),
+               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+               write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
+
+      _ = (input |
+           "WriteWithMultipleDests" >> bigquery.WriteToBigQuery(
+               table=lambda x: (output_table_3
+                                if 'language' in x
+                                else output_table_4),
+               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+               write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
+               max_file_size=20,
+               max_files_per_bundle=-1))
+
+  @attr('IT')
+  def test_one_job_fails_all_jobs_fail(self):
+
+    # If one of the import jobs fails, then other jobs must not be performed.
+    # This is to avoid reinsertion of some records when a pipeline fails and
+    # is rerun.
+    output_table_1 = '%s%s' % (self.output_table, 1)
+    output_table_2 = '%s%s' % (self.output_table, 2)
+
+    self.bigquery_client.get_or_create_table(
+        self.project, self.dataset_id, output_table_1.split('.')[1],
+        bigquery_tools.parse_table_schema_from_json(self.BIG_QUERY_SCHEMA),
+        None, None)
+    self.bigquery_client.get_or_create_table(
+        self.project, self.dataset_id, output_table_2.split('.')[1],
+        bigquery_tools.parse_table_schema_from_json(self.BIG_QUERY_SCHEMA_2),
+        None, None)
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_1,
+            data=[]),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_2,
+            data=[])]
+
+    args = self.test_pipeline.get_full_options_as_args()
+
+    with self.assertRaises(Exception):
+      with beam.Pipeline(argv=args) as p:
+        input = p | beam.Create(_ELEMENTS)
+        input2 = p | "Broken record" >> beam.Create(['language_broken_record'])
+
+        input = (input, input2) | beam.Flatten()
+
+        _ = (input |
+             "WriteWithMultipleDests" >> bigquery.WriteToBigQuery(
+                 table=lambda x: (output_table_1
+                                  if 'language' in x
+                                  else output_table_2),
+                 create_disposition=(
+                     beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
+                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+
+    hamcrest_assert(p, all_of(*pipeline_verifiers))
+
+  def tearDown(self):
+    request = bigquery_api.BigqueryDatasetsDeleteRequest(
+        projectId=self.project, datasetId=self.dataset_id,
+        deleteContents=True)
+    try:
+      logging.info("Deleting dataset %s in project %s",
+                   self.dataset_id, self.project)
+      self.bigquery_client.client.datasets.Delete(request)
+    except HttpError:
+      logging.debug('Failed to clean up dataset %s in project %s',
+                    self.dataset_id, self.project)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 93d5299..2f4fd52 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -21,6 +21,8 @@ Classes, constants and functions in this file are 
experimental and have no
 backwards compatibility guarantees.
 
 These tools include wrappers and clients to interact with BigQuery APIs.
+
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
 """
 
 from __future__ import absolute_import
@@ -127,7 +129,7 @@ def parse_table_reference(table, dataset=None, 
project=None):
       argument.
 
   Returns:
-    A bigquery.TableReference object.
+    A TableReference for the table name that was provided.
 
   Raises:
     ValueError: if the table reference as a string does not match the expected
@@ -136,6 +138,8 @@ def parse_table_reference(table, dataset=None, 
project=None):
 
   if isinstance(table, bigquery.TableReference):
     return table
+  elif callable(table):
+    return table
 
   table_reference = bigquery.TableReference()
   # If dataset argument is not specified, the expectation is that the
@@ -256,24 +260,67 @@ class BigQueryWrapper(object):
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def _insert_load_job(self, project_id, job_id, table_reference, source_uris,
-                       schema=None):
+  def _insert_copy_job(self,
+                       project_id,
+                       job_id,
+                       from_table_reference,
+                       to_table_reference,
+                       create_disposition=None,
+                       write_disposition=None):
+    reference = bigquery.JobReference()
+    reference.jobId = job_id
+    reference.projectId = project_id
+    request = bigquery.BigqueryJobsInsertRequest(
+        projectId=project_id,
+        job=bigquery.Job(
+            configuration=bigquery.JobConfiguration(
+                copy=bigquery.JobConfigurationTableCopy(
+                    destinationTable=to_table_reference,
+                    sourceTable=from_table_reference,
+                    createDisposition=create_disposition,
+                    writeDisposition=write_disposition,
+                )
+            ),
+            jobReference=reference,
+        )
+    )
+
+    logging.info("Inserting job request: %s", request)
+    response = self.client.jobs.Insert(request)
+    logging.info("Response was %s", response)
+    return response.jobReference
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _insert_load_job(self,
+                       project_id,
+                       job_id,
+                       table_reference,
+                       source_uris,
+                       schema=None,
+                       write_disposition=None,
+                       create_disposition=None):
     reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
     request = bigquery.BigqueryJobsInsertRequest(
-        projectId=table_reference.project_id,
+        projectId=project_id,
         job=bigquery.Job(
             configuration=bigquery.JobConfiguration(
                 load=bigquery.JobConfigurationLoad(
-                    source_uris=source_uris,
-                    destination_table=table_reference,
+                    sourceUris=source_uris,
+                    destinationTable=table_reference,
+                    schema=schema,
+                    writeDisposition=write_disposition,
+                    createDisposition=create_disposition,
+                    sourceFormat='NEWLINE_DELIMITED_JSON',
+                    autodetect=schema is None,
                 )
             ),
             jobReference=reference,
         )
     )
-
     response = self.client.jobs.Insert(request)
-    return response.jobReference.jobId
+    return response.jobReference
 
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
@@ -476,6 +523,35 @@ class BigQueryWrapper(object):
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def get_job(self, project, job_id, location=None):
+    request = bigquery.BigqueryJobsGetRequest()
+    request.jobId = job_id
+    request.projectId = project
+    request.location = location
+
+    return self.client.jobs.Get(request)
+
+  def perform_load_job(self,
+                       destination,
+                       files,
+                       job_id,
+                       schema=None,
+                       write_disposition=None,
+                       create_disposition=None):
+    """Starts a job to load data into BigQuery.
+
+    Returns:
+      bigquery.JobReference with the information about the job that was 
started.
+    """
+    return self._insert_load_job(
+        destination.projectId, job_id, destination, files,
+        schema=schema,
+        create_disposition=create_disposition,
+        write_disposition=write_disposition)
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def get_or_create_table(
       self, project_id, dataset_id, table_id, schema,
       create_disposition, write_disposition):
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index c3069a3..5d17e89 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -47,7 +47,7 @@ def retry_on_http_and_value_error(exception):
 
 
 class BigqueryMatcher(BaseMatcher):
-  """Matcher that verifies Bigquery data with given query.
+  """Matcher that verifies the checksum of Bigquery data with given query.
 
   Fetch Bigquery data with given query, compute a hash string and compare
   with expected checksum.
@@ -106,3 +106,59 @@ class BigqueryMatcher(BaseMatcher):
     mismatch_description \
       .append_text("Actual checksum is ") \
       .append_text(self.checksum)
+
+
+class BigqueryFullResultMatcher(BaseMatcher):
+  """Matcher that verifies Bigquery data with given query.
+
+  Fetch Bigquery data with given query, compare to the expected data.
+  """
+
+  def __init__(self, project, query, data):
+    """Initialize BigQueryMatcher object.
+    Args:
+      project: The name (string) of the project.
+      query: The query (string) to perform.
+      data: List of tuples with the expected data.
+    """
+    if bigquery is None:
+      raise ImportError(
+          'Bigquery dependencies are not installed.')
+    if not query or not isinstance(query, str):
+      raise ValueError(
+          'Invalid argument: query. Please use non-empty string')
+
+    self.project = project
+    self.query = query
+    self.expected_data = [sorted(i) for i in data]
+
+  def _matches(self, _):
+    logging.info('Start verify Bigquery data.')
+    # Run query
+    bigquery_client = bigquery.Client(project=self.project)
+    response = self._query_with_retry(bigquery_client)
+    logging.info('Read from given query (%s), total rows %d',
+                 self.query, len(response))
+
+    self.actual_data = [sorted(i) for i in response]
+
+    # Verify result
+    return sorted(self.expected_data) == sorted(self.actual_data)
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry_on_http_and_value_error)
+  def _query_with_retry(self, bigquery_client):
+    """Run Bigquery query with retry if got error http response"""
+    query_job = bigquery_client.query(self.query)
+    return [row.values() for row in query_job]
+
+  def describe_to(self, description):
+    description \
+      .append_text("Expected data is ") \
+      .append_text(self.expected_data)
+
+  def describe_mismatch(self, pipeline_result, mismatch_description):
+    mismatch_description \
+      .append_text("Actual data is ") \
+      .append_text(self.actual_data)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f281d75..957c903 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -590,27 +590,6 @@ class DataflowRunner(PipelineRunner):
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
-  def apply_WriteToBigQuery(self, transform, pcoll, options):
-    # Make sure this is the WriteToBigQuery class that we expected
-    if not isinstance(transform, beam.io.WriteToBigQuery):
-      return self.apply_PTransform(transform, pcoll, options)
-    standard_options = options.view_as(StandardOptions)
-    if standard_options.streaming:
-      if (transform.write_disposition ==
-          beam.io.BigQueryDisposition.WRITE_TRUNCATE):
-        raise RuntimeError('Can not use write truncation mode in streaming')
-      return self.apply_PTransform(transform, pcoll, options)
-    else:
-      return pcoll  | 'WriteToBigQuery' >> beam.io.Write(
-          beam.io.BigQuerySink(
-              transform.table_reference.tableId,
-              transform.table_reference.datasetId,
-              transform.table_reference.projectId,
-              transform.schema,
-              transform.create_disposition,
-              transform.write_disposition,
-              kms_key=transform.kms_key))
-
   def apply_GroupByKey(self, transform, pcoll, options):
     # Infer coder of parent.
     #
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index ba01861..9eed963 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -684,7 +684,11 @@ class PTransformWithSideInputs(PTransform):
     self._cached_fn = self.fn
 
     # Ensure fn and side inputs are picklable for remote execution.
-    self.fn = pickler.loads(pickler.dumps(self.fn))
+    try:
+      self.fn = pickler.loads(pickler.dumps(self.fn))
+    except RuntimeError:
+      raise RuntimeError('Unable to pickle fn %s' % self.fn)
+
     self.args = pickler.loads(pickler.dumps(self.args))
     self.kwargs = pickler.loads(pickler.dumps(self.kwargs))
 

Reply via email to