[ 
https://issues.apache.org/jira/browse/AIRFLOW-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16899909#comment-16899909
 ] 

Tomasz Urbaszek commented on AIRFLOW-2308:
------------------------------------------

{code:java}
class MySensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self,
                 project_id,
                 job_id,
                 bigquery_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args, **kwargs):

        super().__init__(*args, **kwargs)
        self.project_id = project_id
        self.job_id = job_id
        self.bigquery_conn_id = bigquery_conn_id
        self.delegate_to = delegate_to

    def poke(self, context):
        hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, 
delegate_to=self.delegate_to)
        service = hook.get_service()
        bqCursor = BigQueryBaseCursor(service, self.project_id)
        return bqCursor.poll_job_complete(self.job_id)


with models.DAG(
    "example_sensor_test",
    default_args={"start_date": airflow.utils.dates.days_ago(1)},
    schedule_interval=None,
) as dag:
    check_job = MySensor(
        task_id="check_job",
        project_id=PROJECT_ID,
        job_id="none"
    )
{code}
Running "airflow tasks test example_sensor_test check_job  2019-01-01":

 
{code:java}
[2019-08-05 08:45:56,410] {taskinstance.py:1076} INFO - Marking task as FAILED.
Traceback (most recent call last):
File "/workspace/airflow/contrib/hooks/bigquery_hook.py", line 1369, in 
poll_job_complete
job = jobs.get(projectId=self.project_id, 
jobId=job_id).execute(num_retries=self.num_retries)
File 
"/root/.virtualenvs/airflow36/lib/python3.6/site-packages/googleapiclient/_helpers.py",
 line 130, in positional_wrapper
return wrapped(*args, **kwargs)
File 
"/root/.virtualenvs/airflow36/lib/python3.6/site-packages/googleapiclient/http.py",
 line 851, in execute
raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 404 when requesting 
https://www.googleapis.com/bigquery/v2/projects/polidea-airflow/jobs/none?alt=json
 returned "Not found: Job polidea-airflow:none">
{code}
 

So, the method poll_job_complete exists and could be called from sensor.

> Method poll_job_complete in BigQueryBaseCursor class doesn't work outside its 
> class
> -----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2308
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2308
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: contrib, gcp
>    Affects Versions: 2.0.0
>            Reporter: Guillermo Rodríguez Cano
>            Priority: Major
>
> We have encountered an strange behaviour in the aforementioned method 
> `poll_job_complete` of the class `BigQueryBaseCursor` when we were create a 
> sensor that should simply poll to check for the completion of a BigQuery job.
> After creating a `BigQueryBaseCursor` object, when we call the method 
> `poll_job_complete` on that object we get the following error: 
> `AttributeError: ‘BigQueryBaseCursor’ object has no attribute 
> ‘poll_job_complete’`.
>  However, if we copy and paste the code contained in the  `poll_job_complete` 
> function to our sensor, it works.
> We are not sure what is the problem exactly or even why...
> Ideally we would want to do something like this, but we get such error:
> {code:java}
> hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, 
> delegate_to=self.delegate_to)
> service = hook.get_service()
> bqCursor = BigQueryBaseCursor(service, self.project_id)
> return bqCursor.poll_job_complete(self.job_id)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to