Repository: incubator-airflow Updated Branches: refs/heads/master 1e2d23738 -> 4a4b024cb
[AIRFLOW-1529] Add logic supporting quoted newlines in Google BigQuery load jobs Closes #2545 from wileeam/bq-allow-quoted-nl Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4a4b024c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4a4b024c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4a4b024c Branch: refs/heads/master Commit: 4a4b024cb1963bf0c3b19fc55b995c3a5121191c Parents: 1e2d237 Author: Guillermo Rodriguez Cano <[email protected]> Authored: Wed Aug 23 14:36:49 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Aug 23 14:36:49 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 6 ++++++ airflow/contrib/operators/gcs_to_bq.py | 5 +++++ 2 files changed, 11 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4a4b024c/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 73e0a43..b979ed9 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -385,6 +385,7 @@ class BigQueryBaseCursor(object): field_delimiter=',', max_bad_records=0, quote_character=None, + allow_quoted_newlines=False, schema_update_options=()): """ Executes a BigQuery load command to load data from Google Cloud Storage @@ -421,6 +422,8 @@ class BigQueryBaseCursor(object): :type max_bad_records: int :param quote_character: The value that is used to quote data sections in a CSV file. :type quote_character: string + :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false). + :type allow_quoted_newlines: boolean :param schema_update_options: Allows the schema of the desitination table to be updated as a side effect of the load job. :type schema_update_options: list @@ -500,6 +503,9 @@ class BigQueryBaseCursor(object): if quote_character: configuration['load']['quote'] = quote_character + if allow_quoted_newlines: + configuration['load']['allowQuotedNewlines'] = allow_quoted_newlines + return self.run_with_configuration(configuration) def run_with_configuration(self, configuration): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4a4b024c/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index b65d135..bab5abe 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -45,6 +45,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): field_delimiter=',', max_bad_records=0, quote_character=None, + allow_quoted_newlines=False, max_id_key=None, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_storage_default', @@ -87,6 +88,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): :type max_bad_records: int :param quote_character: The value that is used to quote data sections in a CSV file. :type quote_character: string + :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false). + :type allow_quoted_newlines: boolean :param max_id_key: If set, the name of a column in the BigQuery table that's to be loaded. Thsi will be used to select the MAX value from BigQuery after the load occurs. The results will be returned by the @@ -124,6 +127,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): self.field_delimiter = field_delimiter self.max_bad_records = max_bad_records self.quote_character = quote_character + self.allow_quoted_newlines = allow_quoted_newlines self.max_id_key = max_id_key self.bigquery_conn_id = bigquery_conn_id @@ -161,6 +165,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): field_delimiter=self.field_delimiter, max_bad_records=self.max_bad_records, quote_character=self.quote_character, + allow_quoted_newlines=self.allow_quoted_newlines, schema_update_options=self.schema_update_options) if self.max_id_key:
