[
https://issues.apache.org/jira/browse/AIRFLOW-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15282026#comment-15282026
]
dmtran commented on AIRFLOW-96:
-------------------------------
I guess you mean the S3PrefixSensor also needs to be updated.
Sure, I'll give this a try.
> s3_conn_id of s3KeySensor cannot be defined using an environment variable
> -------------------------------------------------------------------------
>
> Key: AIRFLOW-96
> URL: https://issues.apache.org/jira/browse/AIRFLOW-96
> Project: Apache Airflow
> Issue Type: Bug
> Components: operators
> Affects Versions: Airflow 1.6.2
> Environment: Python Version: 2.7.11
> Operating System: OS X El Capitan 10.11.4
> Reporter: dmtran
> Priority: Minor
>
> According to https://pythonhosted.org/airflow/concepts.html#connections,
> Airflow has the ability to reference connections via environment variables
> from the operating system. The environment variable needs to be prefixed with
> AIRFLOW_CONN_ to be considered a connection.
> This doesn't work with an S3KeySensor, the following exception is raised:
> {noformat}
> [2016-05-10 17:01:37,101] {models.py:1041} ERROR - conn_id doesn't exist in
> the repository
> Traceback (most recent call last):
> File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1000,
> in run
> result = task_copy.execute(context=context)
> File
> "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py",
> line 65, in execute
> return_value = self.python_callable(*self.op_args, **self.op_kwargs)
> File "/Users/dmtran/git_repos/coordinator/airflow/dags/test-s3.py", line
> 24, in check_key_in_s3
> s3_conn_id='S3_CONNECTION')
> File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461,
> in wrapper
> result = func(*args, **kwargs)
> File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py",
> line 332, in __init__
> raise AirflowException("conn_id doesn't exist in the repository")
> AirflowException: conn_id doesn't exist in the repository
> {noformat}
> You can reproduce this issue by triggering a DAGrun of the following DAG:
> {code}
> from airflow.hooks.base_hook import CONN_ENV_PREFIX
> from airflow.operators import *
> from airflow.models import DAG
> from datetime import datetime
> import os
> args = {
> 'owner': 'airflow',
> 'start_date': datetime(2016, 5, 10, 7)
> }
> dag = DAG(dag_id='test-s3',
> default_args=args,
> schedule_interval=None)
> def check_key_in_s3(**context):
> os.environ[CONN_ENV_PREFIX + 'S3_CONNECTION'] = '{ "aws_access_key_id":
> "dummyAccessKey", "aws_secret_access_key": "dummySecretKey" }'
> sensor = S3KeySensor(
> task_id='s3keysensor',
> bucket_name='dummy_bucket',
> bucket_key='dummy_key',
> dag=dag,
> s3_conn_id='S3_CONNECTION')
> sensor.execute(context)
> check_s3_key_operator = PythonOperator(
> task_id='check_key_in_s3',
> python_callable=check_key_in_s3,
> provide_context=True,
> dag=dag)
> {code}
> The exception is raised because of the following lines in method __init__ of
> class S3KeySensor:
> {code}
> db = session.query(DB).filter(DB.conn_id == s3_conn_id).first()
> if not db:
> raise AirflowException("conn_id doesn't exist in the repository")
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)