This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5d05835e1d54c87f3a05b5c2bd5f1850b8506dfc Author: Marcos Marx <[email protected]> AuthorDate: Sat Mar 6 11:19:30 2021 -0300 Adds new Airbyte provider (#14492) This commit add hook, operators and sensors to interact with Airbyte external service. (cherry picked from commit 20b72aea4dc1e25f2aa3cfe62b45ca1ff29d1cbb) --- CONTRIBUTING.rst | 23 ++-- INSTALL | 22 ++-- airflow/providers/airbyte/CHANGELOG.rst | 25 ++++ airflow/providers/airbyte/__init__.py | 17 +++ airflow/providers/airbyte/example_dags/__init__.py | 16 +++ .../example_dags/example_airbyte_trigger_job.py | 64 +++++++++++ airflow/providers/airbyte/hooks/__init__.py | 17 +++ airflow/providers/airbyte/hooks/airbyte.py | 109 ++++++++++++++++++ airflow/providers/airbyte/operators/__init__.py | 17 +++ airflow/providers/airbyte/operators/airbyte.py | 85 ++++++++++++++ airflow/providers/airbyte/provider.yaml | 51 +++++++++ airflow/providers/airbyte/sensors/__init__.py | 16 +++ airflow/providers/airbyte/sensors/airbyte.py | 73 ++++++++++++ airflow/providers/dependencies.json | 3 + docs/apache-airflow-providers-airbyte/commits.rst | 27 +++++ .../connections.rst | 36 ++++++ docs/apache-airflow-providers-airbyte/index.rst | 121 ++++++++++++++++++++ .../operators/airbyte.rst | 58 ++++++++++ docs/apache-airflow/extra-packages-ref.rst | 2 + docs/integration-logos/airbyte/Airbyte.png | Bin 0 -> 7405 bytes docs/spelling_wordlist.txt | 2 + setup.py | 1 + tests/core/test_providers_manager.py | 1 + tests/providers/airbyte/__init__.py | 16 +++ tests/providers/airbyte/hooks/__init__.py | 16 +++ tests/providers/airbyte/hooks/test_airbyte.py | 126 +++++++++++++++++++++ tests/providers/airbyte/operators/__init__.py | 16 +++ tests/providers/airbyte/operators/test_airbyte.py | 55 +++++++++ tests/providers/airbyte/sensors/__init__.py | 16 +++ tests/providers/airbyte/sensors/test_airbyte.py | 93 +++++++++++++++ 30 files changed, 1102 insertions(+), 22 deletions(-) diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index e82fd4e..7ac115c 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -585,17 +585,17 @@ This is the full list of those extras: .. START EXTRAS HERE -all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs, -apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, -apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes, -crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, -docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google, -google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes, -ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, -opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, -rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, -snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs, -winrm, yandex, zendesk +airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, +apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, +apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, +cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, +dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, +github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, +jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, +mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, +postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, +sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica, +virtualenv, webhdfs, winrm, yandex, zendesk .. END EXTRAS HERE @@ -653,6 +653,7 @@ Here is the list of packages and their extras: ========================== =========================== Package Extras ========================== =========================== +airbyte http amazon apache.hive,google,imap,mongo,mysql,postgres,ssh apache.beam google apache.druid apache.hive diff --git a/INSTALL b/INSTALL index 34fccd2..46d15f6 100644 --- a/INSTALL +++ b/INSTALL @@ -97,17 +97,17 @@ The list of available extras: # START EXTRAS HERE -all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs, -apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, -apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes, -crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, -docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google, -google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes, -ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, -opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, -rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, -snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs, -winrm, yandex, zendesk +airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, +apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, +apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, +cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, +dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, +github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, +jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, +mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, +postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, +sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica, +virtualenv, webhdfs, winrm, yandex, zendesk # END EXTRAS HERE diff --git a/airflow/providers/airbyte/CHANGELOG.rst b/airflow/providers/airbyte/CHANGELOG.rst new file mode 100644 index 0000000..cef7dda --- /dev/null +++ b/airflow/providers/airbyte/CHANGELOG.rst @@ -0,0 +1,25 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Changelog +--------- + +1.0.0 +..... + +Initial version of the provider. diff --git a/airflow/providers/airbyte/__init__.py b/airflow/providers/airbyte/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/airbyte/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/airbyte/example_dags/__init__.py b/airflow/providers/airbyte/example_dags/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/airflow/providers/airbyte/example_dags/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py b/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py new file mode 100644 index 0000000..1ac62a8 --- /dev/null +++ b/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Example DAG demonstrating the usage of the AirbyteTriggerSyncOperator.""" + +from datetime import timedelta + +from airflow import DAG +from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator +from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor +from airflow.utils.dates import days_ago + +args = { + 'owner': 'airflow', +} + +with DAG( + dag_id='example_airbyte_operator', + default_args=args, + schedule_interval=None, + start_date=days_ago(1), + dagrun_timeout=timedelta(minutes=60), + tags=['example'], +) as dag: + + # [START howto_operator_airbyte_synchronous] + sync_source_destination = AirbyteTriggerSyncOperator( + task_id='airbyte_sync_source_dest_example', + airbyte_conn_id='airbyte_default', + connection_id='15bc3800-82e4-48c3-a32d-620661273f28', + ) + # [END howto_operator_airbyte_synchronous] + + # [START howto_operator_airbyte_asynchronous] + async_source_destination = AirbyteTriggerSyncOperator( + task_id='airbyte_async_source_dest_example', + airbyte_conn_id='airbyte_default', + connection_id='15bc3800-82e4-48c3-a32d-620661273f28', + asynchronous=True, + ) + + airbyte_sensor = AirbyteJobSensor( + task_id='airbyte_sensor_source_dest_example', + airbyte_job_id="{{task_instance.xcom_pull(task_ids='airbyte_async_source_dest_example')}}", + airbyte_conn_id='airbyte_default', + ) + # [END howto_operator_airbyte_asynchronous] + + async_source_destination >> airbyte_sensor diff --git a/airflow/providers/airbyte/hooks/__init__.py b/airflow/providers/airbyte/hooks/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/airbyte/hooks/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py new file mode 100644 index 0000000..0aeb4f8 --- /dev/null +++ b/airflow/providers/airbyte/hooks/airbyte.py @@ -0,0 +1,109 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import time +from typing import Any, Optional + +from airflow.exceptions import AirflowException +from airflow.providers.http.hooks.http import HttpHook + + +class AirbyteHook(HttpHook): + """ + Hook for Airbyte API + + :param airbyte_conn_id: Required. The name of the Airflow connection to get + connection information for Airbyte. + :type airbyte_conn_id: str + :param api_version: Optional. Airbyte API version. + :type api_version: str + """ + + RUNNING = "running" + SUCCEEDED = "succeeded" + CANCELLED = "cancelled" + PENDING = "pending" + FAILED = "failed" + ERROR = "error" + + def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: Optional[str] = "v1") -> None: + super().__init__(http_conn_id=airbyte_conn_id) + self.api_version: str = api_version + + def wait_for_job( + self, job_id: str, wait_seconds: Optional[float] = 3, timeout: Optional[float] = 3600 + ) -> None: + """ + Helper method which polls a job to check if it finishes. + + :param job_id: Required. Id of the Airbyte job + :type job_id: str + :param wait_seconds: Optional. Number of seconds between checks. + :type wait_seconds: float + :param timeout: Optional. How many seconds wait for job to be ready. + Used only if ``asynchronous`` is False. + :type timeout: float + """ + state = None + start = time.monotonic() + while True: + if timeout and start + timeout < time.monotonic(): + raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s") + time.sleep(wait_seconds) + try: + job = self.get_job(job_id=job_id) + state = job.json()["job"]["status"] + except AirflowException as err: + self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err) + continue + + if state in (self.RUNNING, self.PENDING): + continue + if state == self.SUCCEEDED: + break + if state == self.ERROR: + raise AirflowException(f"Job failed:\n{job}") + elif state == self.CANCELLED: + raise AirflowException(f"Job was cancelled:\n{job}") + else: + raise Exception(f"Encountered unexpected state `{state}` for job_id `{job_id}`") + + def submit_sync_connection(self, connection_id: str) -> Any: + """ + Submits a job to a Airbyte server. + + :param connection_id: Required. The ConnectionId of the Airbyte Connection. + :type connectiond_id: str + """ + return self.run( + endpoint=f"api/{self.api_version}/connections/sync", + json={"connectionId": connection_id}, + headers={"accept": "application/json"}, + ) + + def get_job(self, job_id: int) -> Any: + """ + Gets the resource representation for a job in Airbyte. + + :param job_id: Required. Id of the Airbyte job + :type job_id: int + """ + return self.run( + endpoint=f"api/{self.api_version}/jobs/get", + json={"id": job_id}, + headers={"accept": "application/json"}, + ) diff --git a/airflow/providers/airbyte/operators/__init__.py b/airflow/providers/airbyte/operators/__init__.py new file mode 100644 index 0000000..217e5db --- /dev/null +++ b/airflow/providers/airbyte/operators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py new file mode 100644 index 0000000..6932fa3 --- /dev/null +++ b/airflow/providers/airbyte/operators/airbyte.py @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Optional + +from airflow.models import BaseOperator +from airflow.providers.airbyte.hooks.airbyte import AirbyteHook +from airflow.utils.decorators import apply_defaults + + +class AirbyteTriggerSyncOperator(BaseOperator): + """ + This operator allows you to submit a job to an Airbyte server to run a integration + process between your source and destination. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AirbyteTriggerSyncOperator` + + :param airbyte_conn_id: Required. The name of the Airflow connection to get connection + information for Airbyte. + :type airbyte_conn_id: str + :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination. + :type connection_id: str + :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API. + This is useful for submitting long running jobs and + waiting on them asynchronously using the AirbyteJobSensor. + :type asynchronous: bool + :param api_version: Optional. Airbyte API version. + :type api_version: str + :param wait_seconds: Optional. Number of seconds between checks. Only used when ``asynchronous`` is False. + :type wait_seconds: float + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Only used when ``asynchronous`` is False. + :type timeout: float + """ + + template_fields = ('connection_id',) + + @apply_defaults + def __init__( + self, + connection_id: str, + airbyte_conn_id: str = "airbyte_default", + asynchronous: Optional[bool] = False, + api_version: Optional[str] = "v1", + wait_seconds: Optional[float] = 3, + timeout: Optional[float] = 3600, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.airbyte_conn_id = airbyte_conn_id + self.connection_id = connection_id + self.timeout = timeout + self.api_version = api_version + self.wait_seconds = wait_seconds + self.asynchronous = asynchronous + + def execute(self, context) -> None: + """Create Airbyte Job and wait to finish""" + hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) + job_object = hook.submit_sync_connection(connection_id=self.connection_id) + job_id = job_object.json()['job']['id'] + + self.log.info("Job %s was submitted to Airbyte Server", job_id) + if not self.asynchronous: + self.log.info('Waiting for job %s to complete', job_id) + hook.wait_for_job(job_id=job_id, wait_seconds=self.wait_seconds, timeout=self.timeout) + self.log.info('Job %s completed successfully', job_id) + + return job_id diff --git a/airflow/providers/airbyte/provider.yaml b/airflow/providers/airbyte/provider.yaml new file mode 100644 index 0000000..77b109f --- /dev/null +++ b/airflow/providers/airbyte/provider.yaml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-airbyte +name: Airbyte +description: | + `Airbyte <https://airbyte.io/>`__ + +versions: + - 1.0.0 + +integrations: + - integration-name: Airbyte + external-doc-url: https://www.airbyte.io/ + logo: /integration-logos/airbyte/Airbyte.png + how-to-guide: + - /docs/apache-airflow-providers-airbyte/operators/airbyte.rst + tags: [service] + +operators: + - integration-name: Airbyte + python-modules: + - airflow.providers.airbyte.operators.airbyte + +hooks: + - integration-name: Airbyte + python-modules: + - airflow.providers.airbyte.hooks.airbyte + +sensors: + - integration-name: Airbyte + python-modules: + - airflow.providers.airbyte.sensors.airbyte + +hook-class-names: + - airflow.providers.airbyte.hooks.airbyte.AirbyteHook diff --git a/airflow/providers/airbyte/sensors/__init__.py b/airflow/providers/airbyte/sensors/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/airflow/providers/airbyte/sensors/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/airbyte/sensors/airbyte.py b/airflow/providers/airbyte/sensors/airbyte.py new file mode 100644 index 0000000..9799ade --- /dev/null +++ b/airflow/providers/airbyte/sensors/airbyte.py @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains a Airbyte Job sensor.""" +from typing import Optional + +from airflow.exceptions import AirflowException +from airflow.providers.airbyte.hooks.airbyte import AirbyteHook +from airflow.sensors.base import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class AirbyteJobSensor(BaseSensorOperator): + """ + Check for the state of a previously submitted Airbyte job. + + :param airbyte_job_id: Required. Id of the Airbyte job + :type airbyte_job_id: str + :param airbyte_conn_id: Required. The name of the Airflow connection to get + connection information for Airbyte. + :type airbyte_conn_id: str + :param api_version: Optional. Airbyte API version. + :type api_version: str + """ + + template_fields = ('airbyte_job_id',) + ui_color = '#6C51FD' + + @apply_defaults + def __init__( + self, + *, + airbyte_job_id: str, + airbyte_conn_id: str = 'airbyte_default', + api_version: Optional[str] = "v1", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.airbyte_conn_id = airbyte_conn_id + self.airbyte_job_id = airbyte_job_id + self.api_version = api_version + + def poke(self, context: dict) -> bool: + hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) + job = hook.get_job(job_id=self.airbyte_job_id) + status = job.json()['job']['status'] + + if status == hook.FAILED: + raise AirflowException(f"Job failed: \n{job}") + elif status == hook.CANCELLED: + raise AirflowException(f"Job was cancelled: \n{job}") + elif status == hook.SUCCEEDED: + self.log.info("Job %s completed successfully.", self.airbyte_job_id) + return True + elif status == hook.ERROR: + self.log.info("Job %s attempt has failed.", self.airbyte_job_id) + + self.log.info("Waiting for job %s to complete.", self.airbyte_job_id) + return False diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json index 81a3ba4..6027656 100644 --- a/airflow/providers/dependencies.json +++ b/airflow/providers/dependencies.json @@ -1,4 +1,7 @@ { + "airbyte": [ + "http" + ], "amazon": [ "apache.hive", "google", diff --git a/docs/apache-airflow-providers-airbyte/commits.rst b/docs/apache-airflow-providers-airbyte/commits.rst new file mode 100644 index 0000000..cae1272 --- /dev/null +++ b/docs/apache-airflow-providers-airbyte/commits.rst @@ -0,0 +1,27 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Package apache-airflow-providers-airbyte +---------------------------------------- + +`Airbyte <https://airbyte.io/>`__ + + +This is detailed commit list of changes for versions provider package: ``airbyte``. +For high-level changelog, see :doc:`package information including changelog <index>`. diff --git a/docs/apache-airflow-providers-airbyte/connections.rst b/docs/apache-airflow-providers-airbyte/connections.rst new file mode 100644 index 0000000..31b69c7 --- /dev/null +++ b/docs/apache-airflow-providers-airbyte/connections.rst @@ -0,0 +1,36 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Airbyte Connection +================== +The Airbyte connection type use the HTTP protocol. + +Configuring the Connection +-------------------------- +Host(required) + The host to connect to the Airbyte server. + +Port (required) + The port for the Airbyte server. + +Login (optional) + Specify the user name to connect. + +Password (optional) + Specify the password to connect. diff --git a/docs/apache-airflow-providers-airbyte/index.rst b/docs/apache-airflow-providers-airbyte/index.rst new file mode 100644 index 0000000..d83f5e0 --- /dev/null +++ b/docs/apache-airflow-providers-airbyte/index.rst @@ -0,0 +1,121 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-airbyte`` +==================================== + +Content +------- + +.. toctree:: + :maxdepth: 1 + :caption: Guides + + Operators <operators/airbyte> + Connection types <connections> + +.. toctree:: + :maxdepth: 1 + :caption: References + + Python API <_api/airflow/providers/airbyte/index> + +.. toctree:: + :maxdepth: 1 + :caption: Resources + + Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/airbyte/example_dags> + PyPI Repository <https://pypi.org/project/apache-airflow-providers-airbyte/> + +.. toctree:: + :maxdepth: 1 + :caption: Commits + + Detailed list of commits <commits> + +Package apache-airflow-providers-airbyte +---------------------------------------- + +`Airbyte <https://www.airbyte.io/>`__ + + +Release: 1.0.0 + +Provider package +---------------- + +This is a provider package for ``airbyte`` provider. All classes for this provider package +are in ``airflow.providers.airbyte`` python package. + +Installation +------------ + +.. note:: + + On November 2020, new version of PIP (20.3) has been released with a new, 2020 resolver. This resolver + does not yet work with Apache Airflow and might lead to errors in installation - depends on your choice + of extras. In order to install Airflow you need to either downgrade pip to version 20.2.4 + ``pip install --upgrade pip==20.2.4`` or, in case you use Pip 20.3, you need to add option + ``--use-deprecated legacy-resolver`` to your pip install command. + + +You can install this package on top of an existing airflow 2.* installation via +``pip install apache-airflow-providers-airbyte`` + +Cross provider package dependencies +----------------------------------- + +Those are dependencies that might be needed in order to use all the features of the package. +You need to install the specified backport providers package in order to use them. + +You can install such cross-provider dependencies when installing from PyPI. For example: + +.. code-block:: bash + + pip install apache-airflow-providers-airbyte[http] + + +================================================================================================ ======== +Dependent package Extra +================================================================================================ ======== +`apache-airflow-providers-http <https://airflow.apache.org/docs/apache-airflow-providers-http>`_ ``http`` +================================================================================================ ======== + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Changelog +--------- + +1.0.0 +..... + +Initial version of the provider. diff --git a/docs/apache-airflow-providers-airbyte/operators/airbyte.rst b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst new file mode 100644 index 0000000..b674627 --- /dev/null +++ b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst @@ -0,0 +1,58 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:AirbyteTriggerSyncOperator: + +AirbyteTriggerSyncOperator +========================== + +Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to +trigger an existing ConnectionId sync job in Airbyte. + +.. warning:: + This operator triggers a synchronization job in Airbyte. + If triggered again, this operator does not guarantee idempotency. + You must be aware of the source (database, API, etc) you are updating/sync and + the method applied to perform the operation in Airbyte. + + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +The AirbyteTriggerSyncOperator requires the ``connection_id`` this is the uuid identifier +create in Airbyte between a source and destination synchronization job. +Use the ``airbyte_conn_id`` parameter to specify the Airbyte connection to use to +connect to your account. + +You can trigger a synchronization job in Airflow in two ways with the Operator. The first one +is a synchronous process. This will trigger the Airbyte job and the Operator manage the status +of the job. Another way is use the flag ``async = True`` so the Operator only trigger the job and +return the ``job_id`` that should be pass to the AirbyteSensor. + +An example using the synchronous way: + +.. exampleinclude:: /../../airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py + :language: python + :start-after: [START howto_operator_airbyte_synchronous] + :end-before: [END howto_operator_airbyte_synchronous] + +An example using the async way: + +.. exampleinclude:: /../../airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py + :language: python + :start-after: [START howto_operator_airbyte_asynchronous] + :end-before: [END howto_operator_airbyte_asynchronous] diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index 601c6bc..b902868 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -141,6 +141,8 @@ Those are extras that add dependencies needed for integration with external serv +---------------------+-----------------------------------------------------+-----------------------------------------------------+ | extra | install command | enables | +=====================+=====================================================+=====================================================+ +| airbyte | ``pip install 'apache-airflow[airbyte]'`` | Airbyte hooks and operators | ++---------------------+-----------------------------------------------------+-----------------------------------------------------+ | amazon | ``pip install 'apache-airflow[amazon]'`` | Amazon Web Services | +---------------------+-----------------------------------------------------+-----------------------------------------------------+ | azure | ``pip install 'apache-airflow[microsoft.azure]'`` | Microsoft Azure | diff --git a/docs/integration-logos/airbyte/Airbyte.png b/docs/integration-logos/airbyte/Airbyte.png new file mode 100644 index 0000000..0cc1d07 Binary files /dev/null and b/docs/integration-logos/airbyte/Airbyte.png differ diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index ace29d3..2ebb5d1 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1,6 +1,7 @@ Ack Acyclic Airbnb +Airbyte AirflowException Aizhamal Alphasort @@ -420,6 +421,7 @@ acyclic adhoc aijamalnk airbnb +airbyte airfl airflowignore ajax diff --git a/setup.py b/setup.py index 9ccd60e..5ec7d37 100644 --- a/setup.py +++ b/setup.py @@ -523,6 +523,7 @@ devel_hadoop = devel_minreq + hdfs + hive + kerberos + presto + webhdfs # Dict of all providers which are part of the Apache Airflow repository together with their requirements PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = { + 'airbyte': [], 'amazon': amazon, 'apache.beam': apache_beam, 'apache.cassandra': cassandra, diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py index 5fd0af4..7299971 100644 --- a/tests/core/test_providers_manager.py +++ b/tests/core/test_providers_manager.py @@ -21,6 +21,7 @@ import unittest from airflow.providers_manager import ProvidersManager ALL_PROVIDERS = [ + 'apache-airflow-providers-airbyte', 'apache-airflow-providers-amazon', 'apache-airflow-providers-apache-beam', 'apache-airflow-providers-apache-cassandra', diff --git a/tests/providers/airbyte/__init__.py b/tests/providers/airbyte/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tests/providers/airbyte/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/airbyte/hooks/__init__.py b/tests/providers/airbyte/hooks/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tests/providers/airbyte/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py new file mode 100644 index 0000000..09f10be --- /dev/null +++ b/tests/providers/airbyte/hooks/test_airbyte.py @@ -0,0 +1,126 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +from unittest import mock + +import pytest +import requests_mock + +from airflow.exceptions import AirflowException +from airflow.models import Connection +from airflow.providers.airbyte.hooks.airbyte import AirbyteHook +from airflow.utils import db + + +class TestAirbyteHook(unittest.TestCase): + """ + Test all functions from Airbyte Hook + """ + + airbyte_conn_id = 'airbyte_conn_id_test' + connection_id = 'conn_test_sync' + job_id = 1 + sync_connection_endpoint = 'http://test-airbyte:8001/api/v1/connections/sync' + get_job_endpoint = 'http://test-airbyte:8001/api/v1/jobs/get' + _mock_sync_conn_success_response_body = {'job': {'id': 1}} + _mock_job_status_success_response_body = {'job': {'status': 'succeeded'}} + + def setUp(self): + db.merge_conn( + Connection( + conn_id='airbyte_conn_id_test', conn_type='http', host='http://test-airbyte', port=8001 + ) + ) + self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id) + + def return_value_get_job(self, status): + response = mock.Mock() + response.json.return_value = {'job': {'status': status}} + return response + + @requests_mock.mock() + def test_submit_sync_connection(self, m): + m.post( + self.sync_connection_endpoint, status_code=200, json=self._mock_sync_conn_success_response_body + ) + resp = self.hook.submit_sync_connection(connection_id=self.connection_id) + assert resp.status_code == 200 + assert resp.json() == self._mock_sync_conn_success_response_body + + @requests_mock.mock() + def test_get_job_status(self, m): + m.post(self.get_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body) + resp = self.hook.get_job(job_id=self.job_id) + assert resp.status_code == 200 + assert resp.json() == self._mock_job_status_success_response_body + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job') + def test_wait_for_job_succeeded(self, mock_get_job): + mock_get_job.side_effect = [self.return_value_get_job(self.hook.SUCCEEDED)] + self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0) + mock_get_job.assert_called_once_with(job_id=self.job_id) + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job') + def test_wait_for_job_error(self, mock_get_job): + mock_get_job.side_effect = [ + self.return_value_get_job(self.hook.RUNNING), + self.return_value_get_job(self.hook.ERROR), + ] + with pytest.raises(AirflowException, match="Job failed"): + self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0) + + calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)] + assert mock_get_job.has_calls(calls) + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job') + def test_wait_for_job_timeout(self, mock_get_job): + mock_get_job.side_effect = [ + self.return_value_get_job(self.hook.PENDING), + self.return_value_get_job(self.hook.RUNNING), + self.return_value_get_job(self.hook.RUNNING), + ] + with pytest.raises(AirflowException, match="Timeout"): + self.hook.wait_for_job(job_id=self.job_id, wait_seconds=2, timeout=1) + + calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)] + assert mock_get_job.has_calls(calls) + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job') + def test_wait_for_job_state_unrecognized(self, mock_get_job): + mock_get_job.side_effect = [ + self.return_value_get_job(self.hook.RUNNING), + self.return_value_get_job("UNRECOGNIZED"), + ] + with pytest.raises(Exception, match="unexpected state"): + self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0) + + calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)] + assert mock_get_job.has_calls(calls) + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job') + def test_wait_for_job_cancelled(self, mock_get_job): + mock_get_job.side_effect = [ + self.return_value_get_job(self.hook.RUNNING), + self.return_value_get_job(self.hook.CANCELLED), + ] + with pytest.raises(AirflowException, match="Job was cancelled"): + self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0) + + calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)] + assert mock_get_job.has_calls(calls) diff --git a/tests/providers/airbyte/operators/__init__.py b/tests/providers/airbyte/operators/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tests/providers/airbyte/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/airbyte/operators/test_airbyte.py b/tests/providers/airbyte/operators/test_airbyte.py new file mode 100644 index 0000000..bc56c5d --- /dev/null +++ b/tests/providers/airbyte/operators/test_airbyte.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +from unittest import mock + +from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator + + +class TestAirbyteTriggerSyncOp(unittest.TestCase): + """ + Test execute function from Airbyte Operator + """ + + airbyte_conn_id = 'test_airbyte_conn_id' + connection_id = 'test_airbyte_connection' + job_id = 1 + wait_seconds = 0 + timeout = 360 + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.submit_sync_connection') + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.wait_for_job', return_value=None) + def test_execute(self, mock_wait_for_job, mock_submit_sync_connection): + mock_submit_sync_connection.return_value = mock.Mock( + **{'json.return_value': {'job': {'id': self.job_id}}} + ) + + op = AirbyteTriggerSyncOperator( + task_id='test_Airbyte_op', + airbyte_conn_id=self.airbyte_conn_id, + connection_id=self.connection_id, + wait_seconds=self.wait_seconds, + timeout=self.timeout, + ) + op.execute({}) + + mock_submit_sync_connection.assert_called_once_with(connection_id=self.connection_id) + mock_wait_for_job.assert_called_once_with( + job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout + ) diff --git a/tests/providers/airbyte/sensors/__init__.py b/tests/providers/airbyte/sensors/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tests/providers/airbyte/sensors/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/airbyte/sensors/test_airbyte.py b/tests/providers/airbyte/sensors/test_airbyte.py new file mode 100644 index 0000000..5bd69b8 --- /dev/null +++ b/tests/providers/airbyte/sensors/test_airbyte.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from unittest import mock + +import pytest + +from airflow import AirflowException +from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor + + +class TestAirbyteJobSensor(unittest.TestCase): + + task_id = "task-id" + airbyte_conn_id = "airbyte-conn-test" + job_id = 1 + timeout = 120 + + def get_job(self, status): + response = mock.Mock() + response.json.return_value = {'job': {'status': status}} + return response + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job') + def test_done(self, mock_get_job): + mock_get_job.return_value = self.get_job('succeeded') + + sensor = AirbyteJobSensor( + task_id=self.task_id, + airbyte_job_id=self.job_id, + airbyte_conn_id=self.airbyte_conn_id, + ) + ret = sensor.poke(context={}) + mock_get_job.assert_called_once_with(job_id=self.job_id) + assert ret + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job') + def test_failed(self, mock_get_job): + mock_get_job.return_value = self.get_job('failed') + + sensor = AirbyteJobSensor( + task_id=self.task_id, + airbyte_job_id=self.job_id, + airbyte_conn_id=self.airbyte_conn_id, + ) + with pytest.raises(AirflowException, match="Job failed"): + sensor.poke(context={}) + + mock_get_job.assert_called_once_with(job_id=self.job_id) + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job') + def test_running(self, mock_get_job): + mock_get_job.return_value = self.get_job('running') + + sensor = AirbyteJobSensor( + task_id=self.task_id, + airbyte_job_id=self.job_id, + airbyte_conn_id=self.airbyte_conn_id, + ) + ret = sensor.poke(context={}) + + mock_get_job.assert_called_once_with(job_id=self.job_id) + + assert not ret + + @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job') + def test_cancelled(self, mock_get_job): + mock_get_job.return_value = self.get_job('cancelled') + + sensor = AirbyteJobSensor( + task_id=self.task_id, + airbyte_job_id=self.job_id, + airbyte_conn_id=self.airbyte_conn_id, + ) + with pytest.raises(AirflowException, match="Job was cancelled"): + sensor.poke(context={}) + + mock_get_job.assert_called_once_with(job_id=self.job_id)
