amoghrajesh opened a new pull request, #46957:
URL: https://github.com/apache/airflow/pull/46957
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
closes: https://github.com/apache/airflow/issues/46412
## Why
The ConnectionResult model has a `schema_` mapping similar to `Connection`
model (alias to "schema"). Due to this, there was an issue where the value of
`schema_` in ConnectionResult was being set to None during serialisation and
deserialisation processes due to aliasing.
Earlier, this is how it looked:
```
The response from api is: {'conn_id': 'test_jinja_connection_id',
'conn_type': 'postgres', 'host':
'database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com', 'schema': 'postgres',
'login': 'postgres', 'password': 'password', 'port': 5432, 'extra': ''}
The client returned conn_id='test_jinja_connection_id' conn_type='postgres'
host='database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com' schema_='postgres'
login='postgres' password='password' port=5432 extra=''
Conn result in supervisor conn_id='test_jinja_connection_id'
conn_type='postgres' host='database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com'
schema_=None login='postgres' password='password' port=5432 extra=''
type='ConnectionResult'
resp to send to task runner is
b'{"conn_id":"test_jinja_connection_id","conn_type":"postgres","host":"database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com","login":"postgres","password":"password","port":5432,"extra":"","type":"ConnectionResult"}'
```
Due to no `by_alias` and `exclude_unset` set, the client returning: `schema`
didnt mean to be a field and that data was lost.
The solution here was to add `by_alias` so that the "schema" is considered
as alias of "schema_" and serialised.
Now while sending to task runner, we cannot send in "schema" because task
runner uses `decoder.validate_json(msg)` which has no alias option, hence we
use alias again while model dumping and send it ahead:
https://github.com/apache/airflow/compare/main...astronomer:airflow:AIP72-fix-connection-schemaissue?expand=1#diff-c2651fdee1a25e091e2a9d4f937f8032ca3d289d0de76f38ed88aee5df0f880dR790
After making changes:
The response from api is: {'conn_id': 'test_jinja_connection_id',
'conn_type': 'postgres', 'host':
'database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com', 'schema': 'postgres',
'login': 'postgres', 'password': 'password', 'port': 5432, 'extra': ''}
The client returned conn_id='test_jinja_connection_id' conn_type='postgres'
host='database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com' schema_='postgres'
login='postgres' password='password' port=5432 extra=''
Conn result in supervisor conn_id='test_jinja_connection_id'
conn_type='postgres' host='database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com'
schema_='postgres' login='postgres' password='password' port=5432 extra=''
type='ConnectionResult'
resp to send to task runner is
b'{"conn_id":"test_jinja_connection_id","conn_type":"postgres","host":"database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com","schema":"postgres","login":"postgres","password":"password","port":5432,"extra":"","type":"ConnectionResult"}'
## Testing:
DAG:
```
from airflow.providers.common.sql.operators.sql import
SQLExecuteQueryOperator
from airflow.providers.standard.operators.python import PythonOperator
from pendulum import today
from airflow.models import DAG
dag_name = "test_jinja_connection_id"
def conn_id_test(**context):
print("CONTEXT: ", context)
print(f"The connection type is: {context['get_conn_type']}")
print(f"The host is: {context['check_host']}")
print(f"The schema is: {context['get_schema']}")
print(f"The login is: {context['get_login']}")
print(f"The password is: {context['get_pass']}") # returns '***' hides
the password
print(f"The port is: {context['get_port']}")
print(f"The extras are: {context['get_extras']}")
print("asserting the connection type")
assert context["get_conn_type"] == "postgres"
print("asserting the host")
assert context["check_host"] ==
"database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com"
print("asserting the schema")
assert context["get_schema"] == "postgres"
print("asserting the login")
assert context["get_login"] == "postgres"
print("asserting the port")
assert context["get_port"] == "5432"
with DAG(
dag_id=dag_name,
schedule=None,
start_date=today('UTC').add(days=-2),
tags=["core"],
) as dag:
#
# P0 = SQLExecuteQueryOperator(
# task_id="create_table_define_cols",
# conn_id=f"{dag_name}_connection",
# sql="""
# CREATE TABLE IF NOT EXISTS jinja_connection_template_test(
# random_str varchar,
# herbs varchar,
# primary key(herbs));
# """,
# )
py1 = PythonOperator(
task_id="check_jinja_conn_id",
python_callable=conn_id_test,
op_kwargs={
"get_conn_type": f"{{{{ conn.{dag_name}.conn_type }}}}",
"check_host": f"{{{{ conn.{dag_name}.host }}}}",
"get_schema": f"{{{{ conn.{dag_name}.schema }}}}",
"get_login": f"{{{{ conn.{dag_name}.login }}}}",
"get_pass": f"{{{{ conn.{dag_name}.password }}}}",
"get_port": f"{{{{ conn.{dag_name}.port }}}}",
"get_extras": f"{{{{ conn.{dag_name}.extra }}}}",
},
)
py1
```

Logs:
```
[('191aa655be37', ' INFO - ::group::Log message source details
*** Found local files:
*** *
/root/airflow/logs/dag_id=test_jinja_connection_id/run_id=manual__2025-02-21T10:54:02.078910+00:00_gFe4vALV/task_id=check_jinja_conn_id/attempt=1.log
INFO - ::endgroup::
{"timestamp":"2025-02-21T10:54:02.873881","level":"debug","event":"Loading
plugins","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:02.875126","level":"debug","event":"Loading
plugins from directory: /files/plugins","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:02.876595","level":"debug","event":"Loading
plugins from entrypoints","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:02.876719","level":"debug","event":"Importing
entry_point plugin hive","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:02.881524","level":"debug","event":"Importing
entry_point plugin openlineage","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:02.970464","level":"debug","event":"Importing
entry_point plugin edge_executor","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:03.024246","level":"debug","event":"Importing
entry_point plugin hive","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:03.029750","level":"debug","event":"Importing
entry_point plugin databricks_workflow","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:03.158501","level":"debug","event":"Importing
entry_point plugin edge_executor","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:03.162307","level":"debug","event":"Importing
entry_point plugin openlineage","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:03.166200","level":"debug","event":"Importing
entry_point plugin databricks_workflow","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:03.166627","level":"debug","event":"Loading 5
plugin(s) took 292.47 seconds","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-21T10:54:03.166696","level":"debug","event":"Calling
\'on_starting\' with {\'component\':
<airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at
0xffff996ede50>}","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.166726","level":"debug","event":"Hook
impls: []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.166793","level":"debug","event":"Result
from \'on_starting\': []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.168724","level":"info","event":"DAG
bundles loaded:
dags-folder","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager"}
{"timestamp":"2025-02-21T10:54:03.168933","level":"info","event":"Filling up
the DagBag from
/files/dags/dags/connection_from_context.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-21T10:54:03.169359","level":"debug","event":"Importing
/files/dags/dags/connection_from_context.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-21T10:54:03.203944","level":"debug","event":"Initializing
Providers Manager[hooks]","logger":"airflow.providers_manager"}
{"timestamp":"2025-02-21T10:54:03.205112","level":"debug","event":"Initialization
of Providers Manager[hooks] took 0.00
seconds","logger":"airflow.providers_manager"}
{"timestamp":"2025-02-21T10:54:03.209007","level":"debug","event":"Loaded
DAG <DAG: test_jinja_connection_id>","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-21T10:54:03.209186","level":"debug","event":"DAG file
parsed","file":"dags/connection_from_context.py","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.211330","level":"debug","event":"Sending
request","json":"{\\"conn_id\\":\\"test_jinja_connection_id\\",\\"type\\":\\"GetConnection\\"}\
","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.219970","level":"debug","event":"Sending
request","json":"{\\"conn_id\\":\\"test_jinja_connection_id\\",\\"type\\":\\"GetConnection\\"}\
","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.224551","level":"debug","event":"Sending
request","json":"{\\"conn_id\\":\\"test_jinja_connection_id\\",\\"type\\":\\"GetConnection\\"}\
","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.228672","level":"debug","event":"Sending
request","json":"{\\"conn_id\\":\\"test_jinja_connection_id\\",\\"type\\":\\"GetConnection\\"}\
","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.232573","level":"debug","event":"Sending
request","json":"{\\"conn_id\\":\\"test_jinja_connection_id\\",\\"type\\":\\"GetConnection\\"}\
","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.236222","level":"debug","event":"Sending
request","json":"{\\"conn_id\\":\\"test_jinja_connection_id\\",\\"type\\":\\"GetConnection\\"}\
","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.239892","level":"debug","event":"Sending
request","json":"{\\"conn_id\\":\\"test_jinja_connection_id\\",\\"type\\":\\"GetConnection\\"}\
","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.243030","level":"debug","event":"Sending
request","json":"{\\"rendered_fields\\":{\\"templates_dict\\":null,\\"op_args\\":\\"()\\",\\"op_kwargs\\":{\\"get_conn_type\\":\\"postgres\\",\\"check_host\\":\\"database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com\\",\\"get_schema\\":\\"postgres\\",\\"get_login\\":\\"postgres\\",\\"get_pass\\":\\"postgres\\",\\"get_port\\":\\"5432\\",\\"get_extras\\":\\"\\"}},\\"type\\":\\"SetRenderedFields\\"}\
","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.243176","level":"debug","event":"Calling
\'on_task_instance_running\' with {\'previous_state\':
<TaskInstanceState.QUEUED: \'queued\'>, \'task_instance\':
RuntimeTaskInstance(id=UUID(\'01952824-996d-7608-9c0d-d614630967ca\'),
task_id=\'check_jinja_conn_id\', dag_id=\'test_jinja_connection_id\',
run_id=\'manual__2025-02-21T10:54:02.078910+00:00_gFe4vALV\', try_number=1,
map_index=-1, hostname=\'191aa655be37\', task=<Task(PythonOperator):
check_jinja_conn_id>, max_tries=0, start_date=datetime.datetime(2025, 2, 21,
10, 54, 2, 803340, tzinfo=TzInfo(UTC)))}","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.243214","level":"debug","event":"Hook
impls: []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.243245","level":"debug","event":"Result
from \'on_task_instance_running\': []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.260336","level":"warning","event":"PythonOperator.execute
cannot be called outside
TaskInstance!","logger":"airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"}
{"timestamp":"2025-02-21T10:54:03.261079Z","level":"info","event":"CONTEXT:
{\'dag\': <DAG: test_jinja_connection_id>, \'inlets\': [],
\'map_index_template\': None, \'outlets\': [], \'run_id\':
\'manual__2025-02-21T10:54:02.078910+00:00_gFe4vALV\', \'task\':
<Task(PythonOperator): check_jinja_conn_id>, \'task_instance\':
RuntimeTaskInstance(id=UUID(\'01952824-996d-7608-9c0d-d614630967ca\'),
task_id=\'check_jinja_conn_id\', dag_id=\'test_jinja_connection_id\',
run_id=\'manual__2025-02-21T10:54:02.078910+00:00_gFe4vALV\', try_number=1,
map_index=-1, hostname=\'191aa655be37\', task=<Task(PythonOperator):
check_jinja_conn_id>, max_tries=0, start_date=datetime.datetime(2025, 2, 21,
10, 54, 2, 803340, tzinfo=TzInfo(UTC))), \'ti\':
RuntimeTaskInstance(id=UUID(\'01952824-996d-7608-9c0d-d614630967ca\'),
task_id=\'check_jinja_conn_id\', dag_id=\'test_jinja_connection_id\',
run_id=\'manual__2025-02-21T10:54:02.078910+00:00_gFe4vALV\', try_number=1,
map_index=-1, hostname=\'191aa655be37\', ta
sk=<Task(PythonOperator): check_jinja_conn_id>, max_tries=0,
start_date=datetime.datetime(2025, 2, 21, 10, 54, 2, 803340,
tzinfo=TzInfo(UTC))), \'start_date\': datetime.datetime(2025, 2, 21, 10, 54, 2,
803340, tzinfo=TzInfo(UTC)), \'outlet_events\':
<airflow.sdk.execution_time.context.OutletEventAccessors object at
0xffffaedefe50>, \'macros\': <MacrosAccessor (dynamic access to macros)>,
\'params\': {}, \'var\': {\'json\': <VariableAccessor (dynamic access)>,
\'value\': <VariableAccessor (dynamic access)>}, \'conn\': <ConnectionAccessor
(dynamic access)>, \'dag_run\': DagRun(dag_id=\'test_jinja_connection_id\',
run_id=\'manual__2025-02-21T10:54:02.078910+00:00_gFe4vALV\',
logical_date=None, data_interval_start=None, data_interval_end=None,
run_after=datetime.datetime(2025, 2, 21, 10, 54, 2, 78910, tzinfo=TzInfo(UTC)),
start_date=datetime.datetime(2025, 2, 21, 10, 54, 2, 543139,
tzinfo=TzInfo(UTC)), end_date=None, clear_number=0,
run_type=<DagRunType.MANUAL: \'manual\'>, conf={}, ext
ernal_trigger=True), \'task_instance_key_str\':
\'test_jinja_connection_id__check_jinja_conn_id__manual__2025-02-21T10:54:02.078910+00:00_gFe4vALV\',
\'task_reschedule_count\': 0, \'prev_start_date_success\': <Proxy at
0xffffaf616f80 with factory <function
RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaedaa820>>,
\'prev_end_date_success\': <Proxy at 0xffffaee41c40 with factory <function
RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaed9f9d0>>,
\'get_conn_type\': \'postgres\', \'check_host\':
\'database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com\', \'get_schema\':
\'postgres\', \'get_login\': \'postgres\', \'get_pass\': \'postgres\',
\'get_port\': \'5432\', \'get_extras\': \'\', \'templates_dict\':
None}","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261222Z","level":"info","event":"The
connection type is: postgres","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261300Z","level":"info","event":"The host
is:
database-1.cxmxicvi57az.us-east-2.rds.amazonaws.com","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261399Z","level":"info","event":"The
schema is: postgres","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261461Z","level":"info","event":"The login
is: postgres","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261520Z","level":"info","event":"The
password is: postgres","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261627Z","level":"info","event":"The port
is: 5432","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261750Z","level":"info","event":"The
extras are:","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261805Z","level":"info","event":"asserting
the connection type","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261847Z","level":"info","event":"asserting
the host","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261909Z","level":"info","event":"asserting
the schema","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261964Z","level":"info","event":"asserting
the login","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.262007Z","level":"info","event":"asserting
the port","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261106","level":"info","event":"Done.
Returned value was:
None","logger":"airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"}
{"timestamp":"2025-02-21T10:54:03.261311","level":"debug","event":"Sending
request","json":"{\\"state\\":\\"success\\",\\"end_date\\":\\"2025-02-21T10:54:03.261239Z\\",\\"task_outlets\\":[],\\"outlet_events\\":[],\\"type\\":\\"SucceedTask\\"}\
","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261350","level":"debug","event":"Running
finalizers","ti":"RuntimeTaskInstance(id=UUID(\'01952824-996d-7608-9c0d-d614630967ca\'),
task_id=\'check_jinja_conn_id\', dag_id=\'test_jinja_connection_id\',
run_id=\'manual__2025-02-21T10:54:02.078910+00:00_gFe4vALV\', try_number=1,
map_index=-1, hostname=\'191aa655be37\', task=<Task(PythonOperator):
check_jinja_conn_id>, max_tries=0, start_date=datetime.datetime(2025, 2, 21,
10, 54, 2, 803340, tzinfo=TzInfo(UTC)))","logger":"task"}
{"timestamp":"2025-02-21T10:54:03.261455","level":"debug","event":"Calling
\'on_task_instance_success\' with {\'previous_state\':
<TaskInstanceState.RUNNING: \'running\'>, \'task_instance\':
RuntimeTaskInstance(id=UUID(\'01952824-996d-7608-9c0d-d614630967ca\'),
task_id=\'check_jinja_conn_id\', dag_id=\'test_jinja_connection_id\',
run_id=\'manual__2025-02-21T10:54:02.078910+00:00_gFe4vALV\', try_number=1,
map_index=-1, hostname=\'191aa655be37\', task=<Task(PythonOperator):
check_jinja_conn_id>, max_tries=0, start_date=datetime.datetime(2025, 2, 21,
10, 54, 2, 803340, tzinfo=TzInfo(UTC)))}","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.261482","level":"debug","event":"Hook
impls: []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.261512","level":"debug","event":"Result
from \'on_task_instance_success\': []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.261542","level":"debug","event":"Calling
\'before_stopping\' with {\'component\':
<airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at
0xffffaed97700>}","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.261562","level":"debug","event":"Hook
impls: []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-21T10:54:03.261582","level":"debug","event":"Result
from \'before_stopping\': []","logger":"airflow.listeners.listener"}')]
```
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]