This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3072feb021f9dd17b17ff39e3649509f45e96d7f Author: Kanthi <[email protected]> AuthorDate: Thu Jan 14 11:27:50 2021 -0500 Add Neo4j hook and operator (#13324) Close: #12873 (cherry picked from commit 1d2977f6a4c67fa6174c79dcdc4e9ee3ce06f1b1) --- CONTRIBUTING.rst | 9 +- INSTALL | 9 +- airflow/providers/neo4j/README.md | 18 ++++ airflow/providers/neo4j/__init__.py | 17 +++ airflow/providers/neo4j/example_dags/__init__.py | 17 +++ .../providers/neo4j/example_dags/example_neo4j.py | 48 +++++++++ airflow/providers/neo4j/hooks/__init__.py | 17 +++ airflow/providers/neo4j/hooks/neo4j.py | 117 +++++++++++++++++++++ airflow/providers/neo4j/operators/__init__.py | 17 +++ airflow/providers/neo4j/operators/neo4j.py | 62 +++++++++++ airflow/providers/neo4j/provider.yaml | 44 ++++++++ .../connections/neo4j.rst | 63 +++++++++++ docs/apache-airflow-providers-neo4j/index.rst | 48 +++++++++ .../operators/neo4j.rst | 50 +++++++++ docs/apache-airflow/concepts.rst | 4 +- docs/apache-airflow/extra-packages-ref.rst | 2 + docs/apache-airflow/start/local.rst | 2 +- docs/spelling_wordlist.txt | 4 + .../run_install_and_test_provider_packages.sh | 4 +- setup.py | 3 + tests/core/test_providers_manager.py | 2 + tests/providers/neo4j/__init__.py | 17 +++ tests/providers/neo4j/hooks/__init__.py | 17 +++ tests/providers/neo4j/hooks/test_neo4j.py | 65 ++++++++++++ tests/providers/neo4j/operators/__init__.py | 17 +++ tests/providers/neo4j/operators/test_neo4j.py | 61 +++++++++++ 26 files changed, 721 insertions(+), 13 deletions(-) diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index ff8c80a..6d0e224 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -578,10 +578,11 @@ async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, -microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas, opsgenie, -oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, -redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, -sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm, yandex, zendesk +microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, +opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, +rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, +snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm, +yandex, zendesk .. END EXTRAS HERE diff --git a/INSTALL b/INSTALL index 4ee3f2b..e1ef456 100644 --- a/INSTALL +++ b/INSTALL @@ -103,10 +103,11 @@ async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, -microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas, opsgenie, -oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, -redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, -sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm, yandex, zendesk +microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, +opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, +rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, +snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm, +yandex, zendesk # END EXTRAS HERE diff --git a/airflow/providers/neo4j/README.md b/airflow/providers/neo4j/README.md new file mode 100644 index 0000000..ef14aff --- /dev/null +++ b/airflow/providers/neo4j/README.md @@ -0,0 +1,18 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> diff --git a/airflow/providers/neo4j/__init__.py b/airflow/providers/neo4j/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/neo4j/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/neo4j/example_dags/__init__.py b/airflow/providers/neo4j/example_dags/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/neo4j/example_dags/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/neo4j/example_dags/example_neo4j.py b/airflow/providers/neo4j/example_dags/example_neo4j.py new file mode 100644 index 0000000..7d6f2fc --- /dev/null +++ b/airflow/providers/neo4j/example_dags/example_neo4j.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example use of Neo4j related operators. +""" + +from airflow import DAG +from airflow.providers.neo4j.operators.neo4j import Neo4jOperator +from airflow.utils.dates import days_ago + +default_args = { + 'owner': 'airflow', +} + +dag = DAG( + 'example_neo4j', + default_args=default_args, + start_date=days_ago(2), + tags=['example'], +) + +# [START run_query_neo4j_operator] + +neo4j_task = Neo4jOperator( + task_id='run_neo4j_query', + neo4j_conn_id='neo4j_conn_id', + sql='MATCH (tom {name: "Tom Hanks"}) RETURN tom', + dag=dag, +) + +# [END run_query_neo4j_operator] + +neo4j_task diff --git a/airflow/providers/neo4j/hooks/__init__.py b/airflow/providers/neo4j/hooks/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/neo4j/hooks/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/neo4j/hooks/neo4j.py b/airflow/providers/neo4j/hooks/neo4j.py new file mode 100644 index 0000000..d473b01 --- /dev/null +++ b/airflow/providers/neo4j/hooks/neo4j.py @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""This module allows to connect to a Neo4j database.""" + +from neo4j import GraphDatabase, Neo4jDriver, Result + +from airflow.hooks.base import BaseHook +from airflow.models import Connection + + +class Neo4jHook(BaseHook): + """ + Interact with Neo4j. + + Performs a connection to Neo4j and runs the query. + """ + + conn_name_attr = 'neo4j_conn_id' + default_conn_name = 'neo4j_default' + conn_type = 'neo4j' + hook_name = 'Neo4j' + + def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.neo4j_conn_id = conn_id + self.connection = kwargs.pop("connection", None) + self.client = None + self.extras = None + self.uri = None + + def get_conn(self) -> Neo4jDriver: + """ + Function that initiates a new Neo4j connection + with username, password and database schema. + """ + self.connection = self.get_connection(self.neo4j_conn_id) + self.extras = self.connection.extra_dejson.copy() + + self.uri = self.get_uri(self.connection) + self.log.info('URI: %s', self.uri) + + if self.client is not None: + return self.client + + is_encrypted = self.connection.extra_dejson.get('encrypted', False) + + self.client = GraphDatabase.driver( + self.uri, auth=(self.connection.login, self.connection.password), encrypted=is_encrypted + ) + + return self.client + + def get_uri(self, conn: Connection) -> str: + """ + Build the uri based on extras + - Default - uses bolt scheme(bolt://) + - neo4j_scheme - neo4j:// + - certs_self_signed - neo4j+ssc:// + - certs_trusted_ca - neo4j+s:// + :param conn: connection object. + :return: uri + """ + use_neo4j_scheme = conn.extra_dejson.get('neo4j_scheme', False) + scheme = 'neo4j' if use_neo4j_scheme else 'bolt' + + # Self signed certificates + ssc = conn.extra_dejson.get('certs_self_signed', False) + + # Only certificates signed by CA. + trusted_ca = conn.extra_dejson.get('certs_trusted_ca', False) + encryption_scheme = '' + + if ssc: + encryption_scheme = '+ssc' + elif trusted_ca: + encryption_scheme = '+s' + + return '{scheme}{encryption_scheme}://{host}:{port}'.format( + scheme=scheme, + encryption_scheme=encryption_scheme, + host=conn.host, + port='7687' if conn.port is None else f'{conn.port}', + ) + + def run(self, query) -> Result: + """ + Function to create a neo4j session + and execute the query in the session. + + + :param query: Neo4j query + :return: Result + """ + driver = self.get_conn() + if not self.connection.schema: + with driver.session() as session: + result = session.run(query) + else: + with driver.session(database=self.connection.schema) as session: + result = session.run(query) + return result diff --git a/airflow/providers/neo4j/operators/__init__.py b/airflow/providers/neo4j/operators/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/neo4j/operators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/neo4j/operators/neo4j.py b/airflow/providers/neo4j/operators/neo4j.py new file mode 100644 index 0000000..20df9cb --- /dev/null +++ b/airflow/providers/neo4j/operators/neo4j.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Dict, Iterable, Mapping, Optional, Union + +from airflow.models import BaseOperator +from airflow.providers.neo4j.hooks.neo4j import Neo4jHook +from airflow.utils.decorators import apply_defaults + + +class Neo4jOperator(BaseOperator): + """ + Executes sql code in a specific Neo4j database + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:Neo4jOperator` + + :param sql: the sql code to be executed. Can receive a str representing a + sql statement, a list of str (sql statements) + :type sql: str or list[str] + :param neo4j_conn_id: reference to a specific Neo4j database + :type neo4j_conn_id: str + """ + + @apply_defaults + def __init__( + self, + *, + sql: str, + neo4j_conn_id: str = 'neo4j_default', + parameters: Optional[Union[Mapping, Iterable]] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.neo4j_conn_id = neo4j_conn_id + self.sql = sql + self.parameters = parameters + self.hook = None + + def get_hook(self): + """Function to retrieve the Neo4j Hook.""" + return Neo4jHook(conn_id=self.neo4j_conn_id) + + def execute(self, context: Dict) -> None: + self.log.info('Executing: %s', self.sql) + self.hook = self.get_hook() + self.hook.run(self.sql) diff --git a/airflow/providers/neo4j/provider.yaml b/airflow/providers/neo4j/provider.yaml new file mode 100644 index 0000000..9081694 --- /dev/null +++ b/airflow/providers/neo4j/provider.yaml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-neo4j +name: Neo4j +description: | + `Neo4j <https://neo4j.com/>`__ + +versions: + - 1.0.0 +integrations: + - integration-name: Neo4j + external-doc-url: https://neo4j.com/ + how-to-guide: + - /docs/apache-airflow-providers-neo4j/operators/neo4j.rst + tags: [software] + +operators: + - integration-name: Neo4j + python-modules: + - airflow.providers.neo4j.operators.neo4j + +hooks: + - integration-name: Neo4j + python-modules: + - airflow.providers.neo4j.hooks.neo4j + +hook-class-names: + - airflow.providers.neo4j.hooks.neo4j.Neo4jHook diff --git a/docs/apache-airflow-providers-neo4j/connections/neo4j.rst b/docs/apache-airflow-providers-neo4j/connections/neo4j.rst new file mode 100644 index 0000000..33fd6b5 --- /dev/null +++ b/docs/apache-airflow-providers-neo4j/connections/neo4j.rst @@ -0,0 +1,63 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Neo4j Connection +================ +The Neo4j connection type provides connection to a Neo4j database. + +Configuring the Connection +-------------------------- +Host (required) + The host to connect to. + +Schema (optional) + Specify the schema name to be used in the database. + +Login (required) + Specify the user name to connect. + +Password (required) + Specify the password to connect. + +Extra (optional) + Specify the extra parameters (as json dictionary) that can be used in Neo4j + connection. + + The following extras are supported: + + - Default - uses bolt scheme(bolt://) + - neo4j_scheme - neo4j:// + - certs_self_signed - neo4j+ssc:// + - certs_trusted_ca - neo4j+s:// + + * ``encrypted``: Sets encrypted=True/False for GraphDatabase.driver, Set to ``True`` for Neo4j Aura. + * ``neo4j_scheme``: Specifies the scheme to ``neo4j://``, default is ``bolt://`` + * ``certs_self_signed``: Sets the URI scheme to support self-signed certificates(``neo4j+ssc://``) + * ``certs_trusted_ca``: Sets the URI scheme to support only trusted CA(``neo4j+s://``) + + Example "extras" field: + + .. code-block:: json + + { + "encrypted": true, + "neo4j_scheme": true, + "certs_self_signed": true, + "certs_trusted_ca": false + } diff --git a/docs/apache-airflow-providers-neo4j/index.rst b/docs/apache-airflow-providers-neo4j/index.rst new file mode 100644 index 0000000..cafc57b --- /dev/null +++ b/docs/apache-airflow-providers-neo4j/index.rst @@ -0,0 +1,48 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-neo4j`` +================================== + +Content +------- + +.. toctree:: + :maxdepth: 1 + :caption: Guides + + Connection types <connections/neo4j> + Operators <operators/neo4j> + +.. toctree:: + :maxdepth: 1 + :caption: References + + Python API <_api/airflow/providers/neo4j/index> + +.. toctree:: + :maxdepth: 1 + :caption: Resources + + Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/neo4j/example_dags> + +.. toctree:: + :maxdepth: 1 + :caption: Resources + + PyPI Repository <https://pypi.org/project/apache-airflow-providers-neo4j/> diff --git a/docs/apache-airflow-providers-neo4j/operators/neo4j.rst b/docs/apache-airflow-providers-neo4j/operators/neo4j.rst new file mode 100644 index 0000000..411aa0c --- /dev/null +++ b/docs/apache-airflow-providers-neo4j/operators/neo4j.rst @@ -0,0 +1,50 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/operator:Neo4jOperator: + +Neo4jOperator +============= + +Use the :class:`~airflow.providers.neo4j.operators.Neo4jOperator` to execute +SQL commands in a `Neo4j <https://neo4j.com/>`__ database. + + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +Use the ``neo4j_conn_id`` argument to connect to your Neo4j instance where +the connection metadata is structured as follows: + +.. list-table:: Neo4j Airflow Connection Metadata + :widths: 25 25 + :header-rows: 1 + + * - Parameter + - Input + * - Host: string + - Neo4j hostname + * - Schema: string + - Database name + * - Login: string + - Neo4j user + * - Password: string + - Neo4j user password + * - Port: int + - Neo4j port diff --git a/docs/apache-airflow/concepts.rst b/docs/apache-airflow/concepts.rst index 346f6c0..0522c0f 100644 --- a/docs/apache-airflow/concepts.rst +++ b/docs/apache-airflow/concepts.rst @@ -1321,8 +1321,8 @@ In case of DAG and task policies users may raise :class:`~airflow.exceptions.Air to prevent a DAG from being imported or prevent a task from being executed if the task is not compliant with users' check. -Please note, cluster policy will have precedence over task attributes defined in DAG meaning -if ``task.sla`` is defined in dag and also mutated via cluster policy then later will have precedence. +Please note, cluster policy will have precedence over task attributes defined in DAG meaning that +if ``task.sla`` is defined in dag and also mutated via cluster policy then the latter will have precedence. In next sections we show examples of each type of cluster policy. diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index c565a93..b2549ae 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -213,6 +213,8 @@ Those are extras that add dependencies needed for integration with other softwar +---------------------+-----------------------------------------------------+-------------------------------------------+ | mysql | ``pip install 'apache-airflow[mysql]'`` | MySQL operators and hook | +---------------------+-----------------------------------------------------+-------------------------------------------+ +| neo4j | ``pip install 'apache-airflow[neo4j]'`` | Neo4j operators and hook | ++---------------------+-----------------------------------------------------+-------------------------------------------+ | odbc | ``pip install 'apache-airflow[odbc]'`` | ODBC data sources including MS SQL Server | +---------------------+-----------------------------------------------------+-------------------------------------------+ | openfaas | ``pip install 'apache-airflow[openfaas]'`` | OpenFaaS hooks | diff --git a/docs/apache-airflow/start/local.rst b/docs/apache-airflow/start/local.rst index 7b0bb33..64aaa7a 100644 --- a/docs/apache-airflow/start/local.rst +++ b/docs/apache-airflow/start/local.rst @@ -86,7 +86,7 @@ the ``Admin->Configuration`` menu. The PID file for the webserver will be stored in ``$AIRFLOW_HOME/airflow-webserver.pid`` or in ``/run/airflow/webserver.pid`` if started by systemd. -Out of the box, Airflow uses a sqlite database, which you should outgrow +Out of the box, Airflow uses a SQLite database, which you should outgrow fairly quickly since no parallelization is possible using this database backend. It works in conjunction with the :class:`~airflow.executors.sequential_executor.SequentialExecutor` which will diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index c541f06..db4342a 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -252,6 +252,7 @@ NaN Naik Namenode Namespace +Neo4j Nextdoor Nones NotFound @@ -992,6 +993,8 @@ navbar nd ndjson neighbours +neo +neo4j neq networkUri nginx @@ -1219,6 +1222,7 @@ sqlsensor sqoop src srv +ssc ssd sshHook sshtunnel diff --git a/scripts/in_container/run_install_and_test_provider_packages.sh b/scripts/in_container/run_install_and_test_provider_packages.sh index b3ee63b..969fa29 100755 --- a/scripts/in_container/run_install_and_test_provider_packages.sh +++ b/scripts/in_container/run_install_and_test_provider_packages.sh @@ -95,7 +95,7 @@ function discover_all_provider_packages() { # Columns is to force it wider, so it doesn't wrap at 80 characters COLUMNS=180 airflow providers list - local expected_number_of_providers=61 + local expected_number_of_providers=62 local actual_number_of_providers actual_providers=$(airflow providers list --output yaml | grep package_name) actual_number_of_providers=$(wc -l <<<"$actual_providers") @@ -118,7 +118,7 @@ function discover_all_hooks() { group_start "Listing available hooks via 'airflow providers hooks'" COLUMNS=180 airflow providers hooks - local expected_number_of_hooks=59 + local expected_number_of_hooks=60 local actual_number_of_hooks actual_number_of_hooks=$(airflow providers hooks --output table | grep -c "| apache" | xargs) if [[ ${actual_number_of_hooks} != "${expected_number_of_hooks}" ]]; then diff --git a/setup.py b/setup.py index e967781..210b12f 100644 --- a/setup.py +++ b/setup.py @@ -360,6 +360,7 @@ mysql = [ 'mysql-connector-python>=8.0.11, <=8.0.22', 'mysqlclient>=1.3.6,<1.4', ] +neo4j = ['neo4j>=4.2.1'] odbc = [ 'pyodbc', ] @@ -557,6 +558,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = { 'microsoft.winrm': winrm, 'mongo': mongo, 'mysql': mysql, + 'neo4j': neo4j, 'odbc': odbc, 'openfaas': [], 'opsgenie': [], @@ -711,6 +713,7 @@ ALL_DB_PROVIDERS = [ 'microsoft.mssql', 'mongo', 'mysql', + 'neo4j', 'postgres', 'presto', 'vertica', diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py index 4c03984..7d80c58 100644 --- a/tests/core/test_providers_manager.py +++ b/tests/core/test_providers_manager.py @@ -57,6 +57,7 @@ ALL_PROVIDERS = [ 'apache-airflow-providers-microsoft-winrm', 'apache-airflow-providers-mongo', 'apache-airflow-providers-mysql', + 'apache-airflow-providers-neo4j', 'apache-airflow-providers-odbc', 'apache-airflow-providers-openfaas', 'apache-airflow-providers-opsgenie', @@ -122,6 +123,7 @@ CONNECTIONS_LIST = [ 'mongo', 'mssql', 'mysql', + 'neo4j', 'odbc', 'oracle', 'pig_cli', diff --git a/tests/providers/neo4j/__init__.py b/tests/providers/neo4j/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/tests/providers/neo4j/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/neo4j/hooks/__init__.py b/tests/providers/neo4j/hooks/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/tests/providers/neo4j/hooks/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/neo4j/hooks/test_neo4j.py b/tests/providers/neo4j/hooks/test_neo4j.py new file mode 100644 index 0000000..7f64fc4 --- /dev/null +++ b/tests/providers/neo4j/hooks/test_neo4j.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import json +import unittest +from unittest import mock + +from airflow.models import Connection +from airflow.providers.neo4j.hooks.neo4j import Neo4jHook + + +class TestNeo4jHookConn(unittest.TestCase): + def setUp(self): + super().setUp() + self.neo4j_hook = Neo4jHook() + self.connection = Connection( + conn_type='neo4j', login='login', password='password', host='host', schema='schema' + ) + + def test_get_uri_neo4j_scheme(self): + + self.neo4j_hook.get_connection = mock.Mock() + self.neo4j_hook.get_connection.return_value = self.connection + uri = self.neo4j_hook.get_uri(self.connection) + + self.assertEqual(uri, "bolt://host:7687") + + def test_get_uri_bolt_scheme(self): + + self.connection.extra = json.dumps({"bolt_scheme": True}) + self.neo4j_hook.get_connection = mock.Mock() + self.neo4j_hook.get_connection.return_value = self.connection + uri = self.neo4j_hook.get_uri(self.connection) + + self.assertEqual(uri, "bolt://host:7687") + + def test_get_uri_bolt_ssc_scheme(self): + self.connection.extra = json.dumps({"certs_self_signed": True, "bolt_scheme": True}) + self.neo4j_hook.get_connection = mock.Mock() + self.neo4j_hook.get_connection.return_value = self.connection + uri = self.neo4j_hook.get_uri(self.connection) + + self.assertEqual(uri, "bolt+ssc://host:7687") + + def test_get_uri_bolt_trusted_ca_scheme(self): + self.connection.extra = json.dumps({"certs_trusted_ca": True, "bolt_scheme": True}) + self.neo4j_hook.get_connection = mock.Mock() + self.neo4j_hook.get_connection.return_value = self.connection + uri = self.neo4j_hook.get_uri(self.connection) + + self.assertEqual(uri, "bolt+s://host:7687") diff --git a/tests/providers/neo4j/operators/__init__.py b/tests/providers/neo4j/operators/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/tests/providers/neo4j/operators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/neo4j/operators/test_neo4j.py b/tests/providers/neo4j/operators/test_neo4j.py new file mode 100644 index 0000000..39c8d69 --- /dev/null +++ b/tests/providers/neo4j/operators/test_neo4j.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest +from unittest import mock + +from airflow.models.dag import DAG +from airflow.providers.neo4j.operators.neo4j import Neo4jOperator +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2015, 1, 1) +DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() +DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] +TEST_DAG_ID = 'unit_test_dag' + + +class TestNeo4jOperator(unittest.TestCase): + def setUp(self): + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} + dag = DAG(TEST_DAG_ID, default_args=args) + self.dag = dag + + @mock.patch('airflow.providers.neo4j.operators.neo4j.Neo4jOperator.get_hook') + def test_neo4j_operator_test(self, mock_hook): + + sql = """ + MATCH (tom {name: "Tom Hanks"}) RETURN tom + """ + op = Neo4jOperator(task_id='basic_neo4j', sql=sql, dag=self.dag) + op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
