[ https://issues.apache.org/jira/browse/AIRFLOW-96?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
dmtran updated AIRFLOW-96: -------------------------- Description: According to https://pythonhosted.org/airflow/concepts.html#connections, Airflow has the ability to reference connections via environment variables from the operating system. The environment variable needs to be prefixed with AIRFLOW_CONN_ to be considered a connection. This doesn't work with S3KeySensor (or S3PrefixSensor) , the following exception is raised: {noformat} [2016-05-10 17:01:37,101] {models.py:1041} ERROR - conn_id doesn't exist in the repository Traceback (most recent call last): File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1000, in run result = task_copy.execute(context=context) File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 65, in execute return_value = self.python_callable(*self.op_args, **self.op_kwargs) File "/Users/dmtran/git_repos/coordinator/airflow/dags/test-s3.py", line 24, in check_key_in_s3 s3_conn_id='S3_CONNECTION') File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, in wrapper result = func(*args, **kwargs) File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", line 332, in __init__ raise AirflowException("conn_id doesn't exist in the repository") AirflowException: conn_id doesn't exist in the repository {noformat} You can reproduce this issue by triggering a DAGrun of the following DAG: {code} from airflow.hooks.base_hook import CONN_ENV_PREFIX from airflow.operators import * from airflow.models import DAG from datetime import datetime import os args = { 'owner': 'airflow', 'start_date': datetime(2016, 5, 10, 7) } dag = DAG(dag_id='test-s3', default_args=args, schedule_interval=None) os.environ[CONN_ENV_PREFIX + 'S3_CONNECTION'] = 's3://: dummyAccessKey:dummySecretKey@S3' sensor = S3KeySensor( task_id='s3keysensor', bucket_name='dummy_bucket', bucket_key='dummy_key', dag=dag, s3_conn_id='S3_CONNECTION') {code} The exception is raised because of the following lines in method __init__ of class S3KeySensor: {code} db = session.query(DB).filter(DB.conn_id == s3_conn_id).first() if not db: raise AirflowException("conn_id doesn't exist in the repository") {code} was: According to https://pythonhosted.org/airflow/concepts.html#connections, Airflow has the ability to reference connections via environment variables from the operating system. The environment variable needs to be prefixed with AIRFLOW_CONN_ to be considered a connection. This doesn't work with an S3KeySensor, the following exception is raised: {noformat} [2016-05-10 17:01:37,101] {models.py:1041} ERROR - conn_id doesn't exist in the repository Traceback (most recent call last): File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1000, in run result = task_copy.execute(context=context) File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 65, in execute return_value = self.python_callable(*self.op_args, **self.op_kwargs) File "/Users/dmtran/git_repos/coordinator/airflow/dags/test-s3.py", line 24, in check_key_in_s3 s3_conn_id='S3_CONNECTION') File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, in wrapper result = func(*args, **kwargs) File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", line 332, in __init__ raise AirflowException("conn_id doesn't exist in the repository") AirflowException: conn_id doesn't exist in the repository {noformat} You can reproduce this issue by triggering a DAGrun of the following DAG: {code} from airflow.hooks.base_hook import CONN_ENV_PREFIX from airflow.operators import * from airflow.models import DAG from datetime import datetime import os args = { 'owner': 'airflow', 'start_date': datetime(2016, 5, 10, 7) } dag = DAG(dag_id='test-s3', default_args=args, schedule_interval=None) def check_key_in_s3(**context): os.environ[CONN_ENV_PREFIX + 'S3_CONNECTION'] = '{ "aws_access_key_id": "dummyAccessKey", "aws_secret_access_key": "dummySecretKey" }' sensor = S3KeySensor( task_id='s3keysensor', bucket_name='dummy_bucket', bucket_key='dummy_key', dag=dag, s3_conn_id='S3_CONNECTION') sensor.execute(context) check_s3_key_operator = PythonOperator( task_id='check_key_in_s3', python_callable=check_key_in_s3, provide_context=True, dag=dag) {code} The exception is raised because of the following lines in method __init__ of class S3KeySensor: {code} db = session.query(DB).filter(DB.conn_id == s3_conn_id).first() if not db: raise AirflowException("conn_id doesn't exist in the repository") {code} > s3_conn_id of S3KeySensor and S3PrefixSensor cannot be defined using an > environment variable > -------------------------------------------------------------------------------------------- > > Key: AIRFLOW-96 > URL: https://issues.apache.org/jira/browse/AIRFLOW-96 > Project: Apache Airflow > Issue Type: Bug > Components: operators > Affects Versions: Airflow 1.6.2 > Environment: Python Version: 2.7.11 > Operating System: OS X El Capitan 10.11.4 > Reporter: dmtran > Assignee: dmtran > Priority: Minor > > According to https://pythonhosted.org/airflow/concepts.html#connections, > Airflow has the ability to reference connections via environment variables > from the operating system. The environment variable needs to be prefixed with > AIRFLOW_CONN_ to be considered a connection. > This doesn't work with S3KeySensor (or S3PrefixSensor) , the following > exception is raised: > {noformat} > [2016-05-10 17:01:37,101] {models.py:1041} ERROR - conn_id doesn't exist in > the repository > Traceback (most recent call last): > File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1000, > in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", > line 65, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/Users/dmtran/git_repos/coordinator/airflow/dags/test-s3.py", line > 24, in check_key_in_s3 > s3_conn_id='S3_CONNECTION') > File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, > in wrapper > result = func(*args, **kwargs) > File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", > line 332, in __init__ > raise AirflowException("conn_id doesn't exist in the repository") > AirflowException: conn_id doesn't exist in the repository > {noformat} > You can reproduce this issue by triggering a DAGrun of the following DAG: > {code} > from airflow.hooks.base_hook import CONN_ENV_PREFIX > from airflow.operators import * > from airflow.models import DAG > from datetime import datetime > import os > args = { > 'owner': 'airflow', > 'start_date': datetime(2016, 5, 10, 7) > } > dag = DAG(dag_id='test-s3', > default_args=args, > schedule_interval=None) > os.environ[CONN_ENV_PREFIX + 'S3_CONNECTION'] = 's3://: > dummyAccessKey:dummySecretKey@S3' > sensor = S3KeySensor( > task_id='s3keysensor', > bucket_name='dummy_bucket', > bucket_key='dummy_key', > dag=dag, > s3_conn_id='S3_CONNECTION') > {code} > The exception is raised because of the following lines in method __init__ of > class S3KeySensor: > {code} > db = session.query(DB).filter(DB.conn_id == s3_conn_id).first() > if not db: > raise AirflowException("conn_id doesn't exist in the repository") > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)