[ 
https://issues.apache.org/jira/browse/AIRFLOW-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anonymous reassigned AIRFLOW-2146:
----------------------------------

    Assignee:     (was: Kaxil Naik)

> Initialize default Google BigQuery Connection with valid conn_type & Fix 
> broken DBApiHook
> -----------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2146
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2146
>             Project: Apache Airflow
>          Issue Type: Task
>          Components: contrib, gcp
>            Reporter: Kaxil Naik
>            Priority: Major
>             Fix For: 1.10.0
>
>
> `airflow initdb` creates a connection with conn_id='bigquery_default' and 
> conn_type='bigquery'. However, bigquery is not a valid conn_type, according 
> to models.Connection._types, and BigQuery connections should use the 
> google_cloud_platform conn_type.
> Also as [renanleme|https://github.com/renanleme] mentioned 
> [here|https://github.com/apache/incubator-airflow/pull/3031#issuecomment-368132910]
>  the dags he has created are broken when he is using `get_records()` from 
> BigQueryHook which is extended from DbApiHook.
> *Error Log*:
> {code}
> Traceback (most recent call last):
>   File "/src/apache-airflow/airflow/models.py", line 1519, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File "/airflow/dags/lib/operators/test_operator.py", line 21, in execute
>     records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
>   File "/src/apache-airflow/airflow/hooks/base_hook.py", line 92, in 
> get_records
>     raise NotImplementedError()
> {code}
> *Dag*:
> {code:python}
> from datetime import datetime
> from airflow import DAG
> from lib.operators.test_operator import TestOperator
> default_args = {
>     'depends_on_past': False,
>     'start_date': datetime(2018, 2, 21),
> }
> dag = DAG(
>     'test_dag',
>     default_args=default_args,
>     schedule_interval='0 6 * * *'
> )
> sql = '''
>     SELECT id from YOUR_BIGQUERY_TABLE limit 10
> '''
> compare_grouped_event = TestOperator(
>     task_id='test_operator',
>     source_conn_id='gcp_airflow',
>     sql=sql,
>     dag=dag
> )
> {code}
> *Operator*:
> {code:python}
> from airflow.hooks.base_hook import BaseHook
> from airflow.models import BaseOperator
> from airflow.utils.decorators import apply_defaults
> class TestOperator(BaseOperator):
>     @apply_defaults
>     def __init__(
>             self,
>             sql,
>             source_conn_id=None,
>             *args, **kwargs):
>         super(TestOperator, self).__init__(*args, **kwargs)
>         self.sql = sql
>         self.source_conn_id = source_conn_id
>     def execute(self, context=None):
>         records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
>         self.log.info('Fetched records from source')
>     @staticmethod
>     def _get_db_hook(conn_id):
>         return BaseHook.get_hook(conn_id=conn_id)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to