Repository: incubator-airflow Updated Branches: refs/heads/master 38c86bbbc -> 9fd0beaac
[AIRFLOW-1394] Add quote_character param to GCS hook and operator Closes #2428 from dclubb/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9fd0beaa Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9fd0beaa Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9fd0beaa Branch: refs/heads/master Commit: 9fd0beaacbe8943e81b50b4a88f6ffedb6b437f5 Parents: 38c86bb Author: David Clubb <[email protected]> Authored: Thu Jul 13 14:13:39 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Thu Jul 13 14:13:46 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 6 ++++++ airflow/contrib/operators/gcs_to_bq.py | 5 +++++ 2 files changed, 11 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9fd0beaa/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index cc76953..0950b22 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -375,6 +375,7 @@ class BigQueryBaseCursor(object): write_disposition='WRITE_EMPTY', field_delimiter=',', max_bad_records=0, + quote_character=None, schema_update_options=()): """ Executes a BigQuery load command to load data from Google Cloud Storage @@ -409,6 +410,8 @@ class BigQueryBaseCursor(object): :param max_bad_records: The maximum number of bad records that BigQuery can ignore when running the job. :type max_bad_records: int + :param quote_character: The value that is used to quote data sections in a CSV file. + :type quote_character: string :param schema_update_options: Allows the schema of the desitination table to be updated as a side effect of the load job. :type schema_update_options: list @@ -485,6 +488,9 @@ class BigQueryBaseCursor(object): if max_bad_records: configuration['load']['maxBadRecords'] = max_bad_records + if quote_character: + configuration['load']['quote'] = quote_character + return self.run_with_configuration(configuration) def run_with_configuration(self, configuration): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9fd0beaa/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index 44cf7b6..b65d135 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -44,6 +44,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): write_disposition='WRITE_EMPTY', field_delimiter=',', max_bad_records=0, + quote_character=None, max_id_key=None, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_storage_default', @@ -84,6 +85,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): :param max_bad_records: The maximum number of bad records that BigQuery can ignore when running the job. :type max_bad_records: int + :param quote_character: The value that is used to quote data sections in a CSV file. + :type quote_character: string :param max_id_key: If set, the name of a column in the BigQuery table that's to be loaded. Thsi will be used to select the MAX value from BigQuery after the load occurs. The results will be returned by the @@ -120,6 +123,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): self.write_disposition = write_disposition self.field_delimiter = field_delimiter self.max_bad_records = max_bad_records + self.quote_character = quote_character self.max_id_key = max_id_key self.bigquery_conn_id = bigquery_conn_id @@ -156,6 +160,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): write_disposition=self.write_disposition, field_delimiter=self.field_delimiter, max_bad_records=self.max_bad_records, + quote_character=self.quote_character, schema_update_options=self.schema_update_options) if self.max_id_key:
