boittega opened a new issue #8713:
URL: https://github.com/apache/airflow/issues/8713


   **Apache Airflow version**: 1.10.10
   
   **What happened**:
   
   `SparkSqlHook` is not using any connection, the default conn_id is 
`spark_sql_default`, if this connection doesn't exist, the hook returns an 
error:
   ```
   Traceback (most recent call last):
     File 
"/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 983, in _run_raw_task
       result = task_copy.execute(context=context)
     File 
"/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/contrib/operators/spark_sql_operator.py",
 line 109, in execute
       yarn_queue=self._yarn_queue
     File 
"/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/contrib/hooks/spark_sql_hook.py",
 line 75, in __init__
       self._conn = self.get_connection(conn_id)
     File 
"/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/hooks/base_hook.py",
 line 84, in get_connection
       conn = random.choice(list(cls.get_connections(conn_id)))
     File 
"/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/hooks/base_hook.py",
 line 80, in get_connections
       return secrets.get_connections(conn_id)
     File 
"/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/secrets/__init__.py",
 line 56, in get_connections
       raise AirflowException("The conn_id `{0}` isn't defined".format(conn_id))
   airflow.exceptions.AirflowException: The conn_id `spark_sql_default` isn't 
defined
   ```
   If specified any valid connection, it does nothing, the `self._conn` 
variable is never used and there is an empty `get_conn` method.
   ```
       def get_conn(self):
           pass
   ```
   
   **What you expected to happen**:
   
   It should follow the same behaviour of `SparkSubmitHook` to receive the 
master host and extra parameters from the connection OR don't request a 
connection ID.
   
   **How to reproduce it**:
   Just create a DAG with a `SparkSqlOperator` and have not created the 
connection `spark_sql_default`.
   ```
       sql_job = SparkSqlOperator(
           sql="SELECT * FROM test",
           master="local",
           task_id="sql_job"
       )
   ```
   
   
   **Anything else we need to know**:
   
   I am happy to implement any of these solutions.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to