rmidhun23 opened a new pull request, #52990:
URL: https://github.com/apache/airflow/pull/52990

   Closes: #52723
   
   Following up with a PR to fix issue.
   
   # Why
   
   1. When using the `Neo4jOperator` to execute cypher queries, the 
`parameters` argument is not being passed correctly. The operator is designed 
to accept a `parameters` argument, but this argument is not utilized.
   
   2. The `Neo4jOperator` should correctly pass the `parameters` argument to 
the cypher query execution. This would allow for dynamic queries that can 
accept parameters at runtime.
   
   # How
   
   1. `Neo4jHook` hook to accept `parameters` when operator is executed.
   
   2. Pass `parameters` argument to `Neo4j` driver session for queries with 
schema or schemaless operations.
   
   3. Add `tests` for coverage.
   
   ## Deployment Details
   
   ```text
   OS: macOS Sequoia Version 15.4.1 (24E263)
   Docker: 20.10.14
   Kubernetes: (Client Version: v1.22.5, Server Version: v1.33.1)
   kind: v0.29.0 go1.24.2 darwin/arm64
   
   Airflow: 2.10.5
   Helm Chart: airflow-1.16.0 (2.10.5)
   
   neo4j: 2025.04.0
   Helm Chart: neo4j-2025.4.0  App Version: 2025.04.0  
   ```
   
   ```python
   """
   Dag to fix the neo4j provider parameter issue in Airflow.
   """
   
   ## ....
   ## ....
   ## ....
   
   
   # Define the default arguments
   default_args = {
       "owner": "airflow",
       "start_date": datetime(2025, 7, 5),
       "retries": 3,
   }
   
   
   # Define the DAG
   @dag(
       dag_id="oss_contrib_airflow_neo4j_pipeline",
       description="Neo4j provider sample",
       default_args=default_args,
       schedule_interval=None,
       start_date=datetime(2025, 7, 5),
       catchup=False,
       tags=["neo4j provider"],
   )
   def oss_contrib_airflow_neo4j_pipeline():
       """
       DAG to test parameters binding using Neo4jProvider in Airflow.
       """
   
       ## connection uri: 
       neo4j_bolt_vars = Variable.get("bolt_neo4j", default_var="{}", 
deserialize_json=True)
   
       start = EmptyOperator(task_id="start")
   
       update_graph_db = Neo4jOperator(
           task_id="upd_graph_db",
           parameters={"name": "Airflow"},
           neo4j_conn_id=neo4j_bolt_vars.get("conn_id", "bolt_neo4j"),
           sql="""
             // schema neo4j
             MERGE (a:Person {name: $name})
             ON CREATE SET a.created = timestamp()
             ON MATCH SET a.updated = timestamp()
           """,
       )
   
       end = EmptyOperator(task_id="end")
   
       start >> update_graph_db >> end
   
   
   DAG_INSTANCE = oss_contrib_airflow_neo4j_pipeline()
   ```
   
   ## Result
   
   ```text
   Connected to Neo4j using Bolt protocol version 5.8 at neo4j://localhost:7687.
   Type :help for a list of available commands or :exit to exit the shell.
   Note that Cypher queries must end with a semicolon.
   @neo4j> MATCH (a:Person) RETURN a;
   +-----------------------------------------------------+
   | a                                                   |
   +-----------------------------------------------------+
   | (:Person {name: "Airflow", created: 1751911510657}) |
   +-----------------------------------------------------+
   
   1 row
   ready to start consuming query after 76 ms, results consumed after another 3 
ms
   @neo4j> 
   ```
   
   <details>
     <summary>Logs</summary>
   
   ```
   [2025-07-07T18:04:48.747+0000] {dagbag.py:588} INFO - Filling up the DagBag 
from /opt/airflow/dags/oss_contrib_airflow_neo4j.py
   <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
   <jemalloc>: (This is the expected behaviour if you are running under QEMU)
   [2025-07-07T18:05:07.069+0000] {oss_contrib_airflow_neo4j.py:50} INFO - 
create_or_update_connection: Connection bolt_neo4j already exsits:
   [2025-07-07T18:05:07.498+0000] {task_command.py:467} INFO - Running 
<TaskInstance: oss_contrib_airflow_neo4j_pipeline.upd_graph_db 
manual__2025-07-07T17:09:15.242788+00:00 [queued]> on host 
oss-contrib-airflow-neo4j-pipeline-upd-graph-db-zh43aw29
   [2025-07-07T18:05:08.102+0000] {local_task_job_runner.py:123} INFO - 
::group::Pre task execution logs
   [2025-07-07T18:05:08.305+0000] {taskinstance.py:2614} INFO - Dependencies 
all met for dep_context=non-requeueable deps ti=<TaskInstance: 
oss_contrib_airflow_neo4j_pipeline.upd_graph_db 
manual__2025-07-07T17:09:15.242788+00:00 [queued]>
   [2025-07-07T18:05:08.373+0000] {taskinstance.py:2614} INFO - Dependencies 
all met for dep_context=requeueable deps ti=<TaskInstance: 
oss_contrib_airflow_neo4j_pipeline.upd_graph_db 
manual__2025-07-07T17:09:15.242788+00:00 [queued]>
   [2025-07-07T18:05:08.375+0000] {taskinstance.py:2867} INFO - Starting 
attempt 4 of 6
   [2025-07-07T18:05:08.800+0000] {taskinstance.py:2890} INFO - Executing 
<Task(Neo4jOperator): upd_graph_db> on 2025-07-07 17:09:15.242788+00:00
   [2025-07-07T18:05:08.843+0000] {standard_task_runner.py:72} INFO - Started 
process 41 to run task
   [2025-07-07T18:05:08.851+0000] {standard_task_runner.py:104} INFO - Running: 
['airflow', 'tasks', 'run', 'oss_contrib_airflow_neo4j_pipeline', 
'upd_graph_db', 'manual__2025-07-07T17:09:15.242788+00:00', '--job-id', '236', 
'--raw', '--subdir', 'DAGS_FOLDER/oss_contrib_airflow_neo4j.py', '--cfg-path', 
'/tmp/tmpfututloq']
   [2025-07-07T18:05:08.857+0000] {standard_task_runner.py:105} INFO - Job 236: 
Subtask upd_graph_db
   [2025-07-07T18:05:09.447+0000] {task_command.py:467} INFO - Running 
<TaskInstance: oss_contrib_airflow_neo4j_pipeline.upd_graph_db 
manual__2025-07-07T17:09:15.242788+00:00 [running]> on host 
oss-contrib-airflow-neo4j-pipeline-upd-graph-db-zh43aw29
   [2025-07-07T18:05:10.175+0000] {pod_generator.py:472} WARNING - Model file 
/opt/airflow/pod_templates/pod_template_file.yaml does not exist
   [2025-07-07T18:05:10.526+0000] {taskinstance.py:3134} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' 
AIRFLOW_CTX_DAG_ID='oss_contrib_airflow_neo4j_pipeline' 
AIRFLOW_CTX_TASK_ID='upd_graph_db' 
AIRFLOW_CTX_EXECUTION_DATE='2025-07-07T17:09:15.242788+00:00' 
AIRFLOW_CTX_TRY_NUMBER='4' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2025-07-07T17:09:15.242788+00:00'
   [2025-07-07T18:05:10.531+0000] {taskinstance.py:732} INFO - ::endgroup::
   [2025-07-07T18:05:10.568+0000] {neo4j.py:159} INFO - Executing:
   USE neo4j
   MERGE (a:Person {name: $name})
   ON CREATE SET a.created = timestamp()
   ON MATCH SET a.updated = timestamp()
   [2025-07-07T18:05:10.570+0000] {neo4j.py:160} INFO - Parameters: {'name': 
'Airflow'}
   [2025-07-07T18:05:10.576+0000] {neo4j.py:113} INFO - Run: {'name': 'Airflow'}
   [2025-07-07T18:05:10.596+0000] {base.py:84} INFO - Retrieving connection 
'bolt_neo4j'
   [2025-07-07T18:05:10.604+0000] {neo4j.py:50} INFO - URI: 
bolt://neo4j.neo4j.svc.cluster.local:7687
   [2025-07-07T18:05:10.940+0000] {taskinstance.py:341} INFO - ::group::Post 
task execution logs
   [2025-07-07T18:05:10.945+0000] {taskinstance.py:353} INFO - Marking task as 
SUCCESS. dag_id=oss_contrib_airflow_neo4j_pipeline, task_id=upd_graph_db, 
run_id=manual__2025-07-07T17:09:15.242788+00:00, 
execution_date=20250707T170915, start_date=20250707T180508, 
end_date=20250707T180510
   [2025-07-07T18:05:11.372+0000] {local_task_job_runner.py:266} INFO - Task 
exited with return code 0
   [2025-07-07T18:05:11.524+0000] {local_task_job_runner.py:245} INFO - 
::endgroup::
   ```
   </details>
   
   ## Dag UI
   
   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to