[ 
https://issues.apache.org/jira/browse/AIRFLOW-96?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dmtran updated AIRFLOW-96:
--------------------------
    Description: 
According to https://pythonhosted.org/airflow/concepts.html#connections, 
Airflow has the ability to reference connections via environment variables from 
the operating system. The environment variable needs to be prefixed with 
AIRFLOW_CONN_ to be considered a connection.

This doesn't work with S3KeySensor (or S3PrefixSensor) , the following 
exception is raised:

{noformat}
[2016-05-10 17:01:37,101] {models.py:1041} ERROR - conn_id doesn't exist in the 
repository
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1000, 
in run
    result = task_copy.execute(context=context)
  File 
"/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", 
line 65, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/Users/dmtran/git_repos/coordinator/airflow/dags/test-s3.py", line 24, 
in check_key_in_s3
    s3_conn_id='S3_CONNECTION')
  File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, in 
wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", 
line 332, in __init__
    raise AirflowException("conn_id doesn't exist in the repository")
AirflowException: conn_id doesn't exist in the repository
{noformat}

You can reproduce this issue by triggering a DAGrun of the following DAG:
{code}
from airflow.hooks.base_hook import CONN_ENV_PREFIX
from airflow.operators import *
from airflow.models import DAG
from datetime import datetime
import os

args = {
    'owner': 'airflow',
    'start_date': datetime(2016, 5, 10, 7)
}

dag = DAG(dag_id='test-s3',
    default_args=args,
    schedule_interval=None)

os.environ[CONN_ENV_PREFIX + 'S3_CONNECTION'] = 's3://: 
dummyAccessKey:dummySecretKey@S3'

    sensor = S3KeySensor(
        task_id='s3keysensor',
        bucket_name='dummy_bucket',
        bucket_key='dummy_key',
        dag=dag,
        s3_conn_id='S3_CONNECTION')

{code}

The exception is raised because of the following lines in method __init__ of 
class S3KeySensor:
{code}
        db = session.query(DB).filter(DB.conn_id == s3_conn_id).first()
        if not db:
            raise AirflowException("conn_id doesn't exist in the repository")
{code}

  was:
According to https://pythonhosted.org/airflow/concepts.html#connections, 
Airflow has the ability to reference connections via environment variables from 
the operating system. The environment variable needs to be prefixed with 
AIRFLOW_CONN_ to be considered a connection.

This doesn't work with an S3KeySensor, the following exception is raised:

{noformat}
[2016-05-10 17:01:37,101] {models.py:1041} ERROR - conn_id doesn't exist in the 
repository
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1000, 
in run
    result = task_copy.execute(context=context)
  File 
"/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", 
line 65, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/Users/dmtran/git_repos/coordinator/airflow/dags/test-s3.py", line 24, 
in check_key_in_s3
    s3_conn_id='S3_CONNECTION')
  File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, in 
wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", 
line 332, in __init__
    raise AirflowException("conn_id doesn't exist in the repository")
AirflowException: conn_id doesn't exist in the repository
{noformat}

You can reproduce this issue by triggering a DAGrun of the following DAG:
{code}
from airflow.hooks.base_hook import CONN_ENV_PREFIX
from airflow.operators import *
from airflow.models import DAG
from datetime import datetime
import os

args = {
    'owner': 'airflow',
    'start_date': datetime(2016, 5, 10, 7)
}

dag = DAG(dag_id='test-s3',
    default_args=args,
    schedule_interval=None)

def check_key_in_s3(**context):
    os.environ[CONN_ENV_PREFIX + 'S3_CONNECTION'] = '{ "aws_access_key_id": 
"dummyAccessKey",  "aws_secret_access_key": "dummySecretKey" }'
    sensor = S3KeySensor(
        task_id='s3keysensor',
        bucket_name='dummy_bucket',
        bucket_key='dummy_key',
        dag=dag,
        s3_conn_id='S3_CONNECTION')
    sensor.execute(context)

check_s3_key_operator = PythonOperator(
    task_id='check_key_in_s3',
    python_callable=check_key_in_s3,
    provide_context=True,
    dag=dag)
{code}

The exception is raised because of the following lines in method __init__ of 
class S3KeySensor:
{code}
        db = session.query(DB).filter(DB.conn_id == s3_conn_id).first()
        if not db:
            raise AirflowException("conn_id doesn't exist in the repository")
{code}


> s3_conn_id of S3KeySensor and S3PrefixSensor cannot be defined using an 
> environment variable
> --------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-96
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-96
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators
>    Affects Versions: Airflow 1.6.2
>         Environment:     Python Version: 2.7.11
>     Operating System: OS X El Capitan 10.11.4
>            Reporter: dmtran
>            Assignee: dmtran
>            Priority: Minor
>
> According to https://pythonhosted.org/airflow/concepts.html#connections, 
> Airflow has the ability to reference connections via environment variables 
> from the operating system. The environment variable needs to be prefixed with 
> AIRFLOW_CONN_ to be considered a connection.
> This doesn't work with S3KeySensor (or S3PrefixSensor) , the following 
> exception is raised:
> {noformat}
> [2016-05-10 17:01:37,101] {models.py:1041} ERROR - conn_id doesn't exist in 
> the repository
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1000, 
> in run
>     result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py",
>  line 65, in execute
>     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
>   File "/Users/dmtran/git_repos/coordinator/airflow/dags/test-s3.py", line 
> 24, in check_key_in_s3
>     s3_conn_id='S3_CONNECTION')
>   File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, 
> in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", 
> line 332, in __init__
>     raise AirflowException("conn_id doesn't exist in the repository")
> AirflowException: conn_id doesn't exist in the repository
> {noformat}
> You can reproduce this issue by triggering a DAGrun of the following DAG:
> {code}
> from airflow.hooks.base_hook import CONN_ENV_PREFIX
> from airflow.operators import *
> from airflow.models import DAG
> from datetime import datetime
> import os
> args = {
>     'owner': 'airflow',
>     'start_date': datetime(2016, 5, 10, 7)
> }
> dag = DAG(dag_id='test-s3',
>     default_args=args,
>     schedule_interval=None)
> os.environ[CONN_ENV_PREFIX + 'S3_CONNECTION'] = 's3://: 
> dummyAccessKey:dummySecretKey@S3'
>     sensor = S3KeySensor(
>         task_id='s3keysensor',
>         bucket_name='dummy_bucket',
>         bucket_key='dummy_key',
>         dag=dag,
>         s3_conn_id='S3_CONNECTION')
> {code}
> The exception is raised because of the following lines in method __init__ of 
> class S3KeySensor:
> {code}
>         db = session.query(DB).filter(DB.conn_id == s3_conn_id).first()
>         if not db:
>             raise AirflowException("conn_id doesn't exist in the repository")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to