This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d61a1fa [BEAM-11517] Enable test_file_loads on Dataflow
new e9a7b44 Merge pull request #13601 from [BEAM-11517] Fix
test_file_loads on Dataflow
d61a1fa is described below
commit d61a1fa2d7feed8a9cf1473da7090e461815eac5
Author: Udi Meiri <[email protected]>
AuthorDate: Tue Dec 22 17:23:09 2020 -0800
[BEAM-11517] Enable test_file_loads on Dataflow
---
.../apache_beam/io/gcp/bigquery_file_loads.py | 21 +++++++++++++--------
sdks/python/apache_beam/io/gcp/bigquery_test.py | 2 --
2 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 0342b4c..ae5ebcc 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -794,7 +794,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
partitions_direct_to_destination,
load_job_name_pcv,
copy_job_name_pcv,
- singleton_pc,
+ p,
step_name):
"""Load data to BigQuery
@@ -832,7 +832,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES]
destination_copy_job_ids_pc = (
- singleton_pc
+ p
+ | "ImpulseMonitorLoadJobs" >> beam.Create([None])
| "WaitForTempTableLoadJobs" >> beam.ParDo(
WaitForBQJobs(self.test_client),
beam.pvalue.AsList(temp_tables_load_job_ids_pc))
@@ -845,7 +846,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
copy_job_name_pcv))
finished_copy_jobs_pc = (
- singleton_pc
+ p
+ | "ImpulseMonitorCopyJobs" >> beam.Create([None])
| "WaitForCopyJobs" >> beam.ParDo(
WaitForBQJobs(self.test_client),
beam.pvalue.AsList(destination_copy_job_ids_pc)))
@@ -879,7 +881,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
*self.schema_side_inputs))
_ = (
- singleton_pc
+ p
+ | "ImpulseMonitorDestLoadJobs" >> beam.Create([None])
| "WaitForDestinationLoadJobs" >> beam.ParDo(
WaitForBQJobs(self.test_client),
beam.pvalue.AsList(destination_load_job_ids_pc)))
@@ -964,14 +967,16 @@ class BigQueryBatchFileLoads(beam.PTransform):
empty_pc,
load_job_name_pcv,
copy_job_name_pcv,
- singleton_pc,
+ p,
step_name))
else:
destination_load_job_ids_pc, destination_copy_job_ids_pc = (
self._load_data(multiple_partitions_per_destination_pc,
- single_partition_per_destination_pc,
- load_job_name_pcv, copy_job_name_pcv, singleton_pc,
- step_name))
+ single_partition_per_destination_pc,
+ load_job_name_pcv,
+ copy_job_name_pcv,
+ p,
+ step_name))
return {
self.DESTINATION_JOBID_PAIRS: destination_load_job_ids_pc,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 86614b4..fefe461 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -1245,8 +1245,6 @@ class PubSubBigQueryIT(unittest.TestCase):
@attr('IT')
def test_file_loads(self):
- if isinstance(self.test_pipeline.runner, TestDataflowRunner):
- self.skipTest('https://issuetracker.google.com/issues/118375066')
self._run_pubsub_bq_pipeline(
WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)