This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 25fd66c4b4 Ensure Airbyte Provider is Compatible with Cloud and Config
APIs (#37943)
25fd66c4b4 is described below
commit 25fd66c4b48fd940b11b09d3b590ab9d002cda11
Author: Chris Hronek <[email protected]>
AuthorDate: Wed Mar 6 12:48:40 2024 -0700
Ensure Airbyte Provider is Compatible with Cloud and Config APIs (#37943)
* Add parameter to specify which airbyte api to use
* Fix provider tests
* Add documentation outlining api types
* Add api_type to the airbytejobsensor
* Make api_type values less redundant
---
airflow/providers/airbyte/hooks/airbyte.py | 134 +++++++++++++++------
airflow/providers/airbyte/operators/airbyte.py | 20 ++-
airflow/providers/airbyte/sensors/airbyte.py | 24 +++-
airflow/providers/airbyte/triggers/airbyte.py | 8 +-
.../connections.rst | 12 +-
.../operators/airbyte.rst | 5 +
tests/providers/airbyte/triggers/test_airbyte.py | 26 +++-
7 files changed, 174 insertions(+), 55 deletions(-)
diff --git a/airflow/providers/airbyte/hooks/airbyte.py
b/airflow/providers/airbyte/hooks/airbyte.py
index 4545eeb7ba..e0ae41ff13 100644
--- a/airflow/providers/airbyte/hooks/airbyte.py
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -20,7 +20,7 @@ from __future__ import annotations
import base64
import json
import time
-from typing import TYPE_CHECKING, Any, TypeVar
+from typing import TYPE_CHECKING, Any, Literal, TypeVar
import aiohttp
from aiohttp import ClientResponseError
@@ -42,6 +42,7 @@ class AirbyteHook(HttpHook):
:param airbyte_conn_id: Optional. The name of the Airflow connection to get
connection information for Airbyte. Defaults to "airbyte_default".
:param api_version: Optional. Airbyte API version. Defaults to "v1".
+ :param api_type: Optional. The type of Airbyte API to use. Either "config"
or "cloud". Defaults to "config".
"""
conn_name_attr = "airbyte_conn_id"
@@ -57,23 +58,35 @@ class AirbyteHook(HttpHook):
ERROR = "error"
INCOMPLETE = "incomplete"
- def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version:
str = "v1") -> None:
+ def __init__(
+ self,
+ airbyte_conn_id: str = "airbyte_default",
+ api_version: str = "v1",
+ api_type: Literal["config", "cloud"] = "config",
+ ) -> None:
super().__init__(http_conn_id=airbyte_conn_id)
self.api_version: str = api_version
+ self.api_type: str = api_type
async def get_headers_tenants_from_connection(self) -> tuple[dict[str,
Any], str]:
"""Get Headers, tenants from the connection details."""
connection: Connection = await
sync_to_async(self.get_connection)(self.http_conn_id)
base_url = connection.host
- credentials = f"{connection.login}:{connection.password}"
- credentials_base64 =
base64.b64encode(credentials.encode("utf-8")).decode("utf-8")
-
- authorized_headers = {
- "accept": "application/json",
- "content-type": "application/json",
- "authorization": f"Basic {credentials_base64}",
- }
+ if self.api_type == "config":
+ credentials = f"{connection.login}:{connection.password}"
+ credentials_base64 =
base64.b64encode(credentials.encode("utf-8")).decode("utf-8")
+ authorized_headers = {
+ "accept": "application/json",
+ "content-type": "application/json",
+ "authorization": f"Basic {credentials_base64}",
+ }
+ else:
+ authorized_headers = {
+ "accept": "application/json",
+ "content-type": "application/json",
+ "authorization": f"Bearer {connection.password}",
+ }
return authorized_headers, base_url
@@ -84,16 +97,28 @@ class AirbyteHook(HttpHook):
:param job_id: The ID of an Airbyte Sync Job.
"""
headers, base_url = await self.get_headers_tenants_from_connection()
- url = f"{base_url}/api/{self.api_version}/jobs/get"
- self.log.info("URL for api request: %s", url)
- async with aiohttp.ClientSession(headers=headers) as session:
- async with session.post(url=url, data=json.dumps({"id": job_id}))
as response:
- try:
- response.raise_for_status()
- return await response.json()
- except ClientResponseError as e:
- msg = f"{e.status}: {e.message} - {e.request_info}"
- raise AirflowException(msg)
+ if self.api_type == "config":
+ url = f"{base_url}/api/{self.api_version}/jobs/get"
+ self.log.info("URL for api request: %s", url)
+ async with aiohttp.ClientSession(headers=headers) as session:
+ async with session.post(url=url, data=json.dumps({"id":
job_id})) as response:
+ try:
+ response.raise_for_status()
+ return await response.json()
+ except ClientResponseError as e:
+ msg = f"{e.status}: {e.message} - {e.request_info}"
+ raise AirflowException(msg)
+ else:
+ url = f"{base_url}/{self.api_version}/jobs/{job_id}"
+ self.log.info("URL for api request: %s", url)
+ async with aiohttp.ClientSession(headers=headers) as session:
+ async with session.get(url=url) as response:
+ try:
+ response.raise_for_status()
+ return await response.json()
+ except ClientResponseError as e:
+ msg = f"{e.status}: {e.message} - {e.request_info}"
+ raise AirflowException(msg)
async def get_job_status(self, job_id: int) -> str:
"""
@@ -103,8 +128,10 @@ class AirbyteHook(HttpHook):
"""
self.log.info("Getting the status of job run %s.", job_id)
response = await self.get_job_details(job_id=job_id)
- job_run_status: str = response["job"]["status"]
- return job_run_status
+ if self.api_type == "config":
+ return str(response["job"]["status"])
+ else:
+ return str(response["status"])
def wait_for_job(self, job_id: str | int, wait_seconds: float = 3,
timeout: float | None = 3600) -> None:
"""
@@ -124,7 +151,10 @@ class AirbyteHook(HttpHook):
time.sleep(wait_seconds)
try:
job = self.get_job(job_id=(int(job_id)))
- state = job.json()["job"]["status"]
+ if self.api_type == "config":
+ state = job.json()["job"]["status"]
+ else:
+ state = job.json()["status"]
except AirflowException as err:
self.log.info("Retrying. Airbyte API returned server error
when waiting for job: %s", err)
continue
@@ -146,11 +176,23 @@ class AirbyteHook(HttpHook):
:param connection_id: Required. The ConnectionId of the Airbyte
Connection.
"""
- return self.run(
- endpoint=f"api/{self.api_version}/connections/sync",
- json={"connectionId": connection_id},
- headers={"accept": "application/json"},
- )
+ if self.api_type == "config":
+ return self.run(
+ endpoint=f"api/{self.api_version}/connections/sync",
+ json={"connectionId": connection_id},
+ headers={"accept": "application/json"},
+ )
+ else:
+ conn = self.get_connection(self.http_conn_id)
+ self.method = "POST"
+ return self.run(
+ endpoint=f"{self.api_version}/jobs",
+ headers={"accept": "application/json", "authorization":
f"Bearer {conn.password}"},
+ json={
+ "jobType": "sync",
+ "connectionId": connection_id,
+ }, # TODO: add an option to pass jobType = reset
+ )
def get_job(self, job_id: int) -> Any:
"""
@@ -158,11 +200,19 @@ class AirbyteHook(HttpHook):
:param job_id: Required. Id of the Airbyte job
"""
- return self.run(
- endpoint=f"api/{self.api_version}/jobs/get",
- json={"id": job_id},
- headers={"accept": "application/json"},
- )
+ if self.api_type == "config":
+ return self.run(
+ endpoint=f"api/{self.api_version}/jobs/get",
+ json={"id": job_id},
+ headers={"accept": "application/json"},
+ )
+ else:
+ self.method = "GET"
+ conn = self.get_connection(self.http_conn_id)
+ return self.run(
+ endpoint=f"{self.api_version}/jobs/{job_id}",
+ headers={"accept": "application/json", "authorization":
f"Bearer {conn.password}"},
+ )
def cancel_job(self, job_id: int) -> Any:
"""
@@ -170,11 +220,19 @@ class AirbyteHook(HttpHook):
:param job_id: Required. Id of the Airbyte job
"""
- return self.run(
- endpoint=f"api/{self.api_version}/jobs/cancel",
- json={"id": job_id},
- headers={"accept": "application/json"},
- )
+ if self.api_type == "config":
+ return self.run(
+ endpoint=f"api/{self.api_version}/jobs/cancel",
+ json={"id": job_id},
+ headers={"accept": "application/json"},
+ )
+ else:
+ self.method = "DELETE"
+ conn = self.get_connection(self.http_conn_id)
+ return self.run(
+ endpoint=f"{self.api_version}/jobs/{job_id}",
+ headers={"accept": "application/json", "authorization":
f"Bearer {conn.password}"},
+ )
def test_connection(self):
"""Tests the Airbyte connection by hitting the health API."""
diff --git a/airflow/providers/airbyte/operators/airbyte.py
b/airflow/providers/airbyte/operators/airbyte.py
index d8fdddb0e2..c2f0a56202 100644
--- a/airflow/providers/airbyte/operators/airbyte.py
+++ b/airflow/providers/airbyte/operators/airbyte.py
@@ -18,7 +18,7 @@
from __future__ import annotations
import time
-from typing import TYPE_CHECKING, Any, Sequence
+from typing import TYPE_CHECKING, Any, Literal, Sequence
from airflow.configuration import conf
from airflow.exceptions import AirflowException
@@ -46,6 +46,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
waiting on them asynchronously using the AirbyteJobSensor. Defaults to
False.
:param deferrable: Run operator in the deferrable mode.
:param api_version: Optional. Airbyte API version. Defaults to "v1".
+ :param api_type: Optional. The type of Airbyte API to use. Either "config"
or "cloud". Defaults to "config".
:param wait_seconds: Optional. Number of seconds between checks. Only used
when ``asynchronous`` is False.
Defaults to 3 seconds.
:param timeout: Optional. The amount of time, in seconds, to wait for the
request to complete.
@@ -62,6 +63,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
asynchronous: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
api_version: str = "v1",
+ api_type: Literal["config", "cloud"] = "config",
wait_seconds: float = 3,
timeout: float = 3600,
**kwargs,
@@ -71,16 +73,23 @@ class AirbyteTriggerSyncOperator(BaseOperator):
self.connection_id = connection_id
self.timeout = timeout
self.api_version = api_version
+ self.api_type = api_type
self.wait_seconds = wait_seconds
self.asynchronous = asynchronous
self.deferrable = deferrable
def execute(self, context: Context) -> None:
"""Create Airbyte Job and wait to finish."""
- hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
+ hook = AirbyteHook(
+ airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version, api_type=self.api_type
+ )
job_object =
hook.submit_sync_connection(connection_id=self.connection_id)
- self.job_id = job_object.json()["job"]["id"]
- state = job_object.json()["job"]["status"]
+ if self.api_type == "config":
+ self.job_id = job_object.json()["job"]["id"]
+ state = job_object.json()["job"]["status"]
+ else:
+ self.job_id = job_object.json()["jobId"]
+ state = job_object.json()["status"]
end_time = time.time() + self.timeout
self.log.info("Job %s was submitted to Airbyte Server", self.job_id)
@@ -92,6 +101,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
timeout=self.execution_timeout,
trigger=AirbyteSyncTrigger(
conn_id=self.airbyte_conn_id,
+ api_type=self.api_type,
job_id=self.job_id,
end_time=end_time,
poll_interval=60,
@@ -128,7 +138,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
def on_kill(self):
"""Cancel the job if task is cancelled."""
- hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id)
+ hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_type=self.api_type)
if self.job_id:
self.log.info("on_kill: cancel the airbyte Job %s", self.job_id)
hook.cancel_job(self.job_id)
diff --git a/airflow/providers/airbyte/sensors/airbyte.py
b/airflow/providers/airbyte/sensors/airbyte.py
index 4cc280cfc4..2db010d3bb 100644
--- a/airflow/providers/airbyte/sensors/airbyte.py
+++ b/airflow/providers/airbyte/sensors/airbyte.py
@@ -20,7 +20,7 @@ from __future__ import annotations
import time
import warnings
-from typing import TYPE_CHECKING, Any, Sequence
+from typing import TYPE_CHECKING, Any, Literal, Sequence
from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowSkipException
@@ -41,6 +41,7 @@ class AirbyteJobSensor(BaseSensorOperator):
:param deferrable: Run sensor in the deferrable mode.
connection information for Airbyte. Defaults to "airbyte_default".
:param api_version: Optional. Airbyte API version. Defaults to "v1".
+ :param api_type: Optional. The type of Airbyte API to use. Either "config"
or "cloud". Defaults to "config".
"""
template_fields: Sequence[str] = ("airbyte_job_id",)
@@ -53,6 +54,7 @@ class AirbyteJobSensor(BaseSensorOperator):
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
airbyte_conn_id: str = "airbyte_default",
api_version: str = "v1",
+ api_type: Literal["config", "cloud"] = "config",
**kwargs,
) -> None:
if deferrable:
@@ -77,11 +79,17 @@ class AirbyteJobSensor(BaseSensorOperator):
self.airbyte_conn_id = airbyte_conn_id
self.airbyte_job_id = airbyte_job_id
self.api_version = api_version
+ self.api_type = api_type
def poke(self, context: Context) -> bool:
- hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
+ hook = AirbyteHook(
+ airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version, api_type=self.api_type
+ )
job = hook.get_job(job_id=self.airbyte_job_id)
- status = job.json()["job"]["status"]
+ if self.api_type == "config":
+ status = job.json()["job"]["status"]
+ else:
+ status = job.json()["status"]
if status == hook.FAILED:
# TODO: remove this if block when min_airflow_version is set to
higher than 2.7.1
@@ -109,9 +117,14 @@ class AirbyteJobSensor(BaseSensorOperator):
if not self.deferrable:
super().execute(context)
else:
- hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id)
+ hook = AirbyteHook(
+ airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version, api_type=self.api_type
+ )
job = hook.get_job(job_id=(int(self.airbyte_job_id)))
- state = job.json()["job"]["status"]
+ if self.api_type == "config":
+ state = job.json()["job"]["status"]
+ else:
+ state = job.json()["status"]
end_time = time.time() + self.timeout
self.log.info("Airbyte Job Id: Job %s", self.airbyte_job_id)
@@ -120,6 +133,7 @@ class AirbyteJobSensor(BaseSensorOperator):
self.defer(
timeout=self.execution_timeout,
trigger=AirbyteSyncTrigger(
+ api_type=self.api_type,
conn_id=self.airbyte_conn_id,
job_id=self.airbyte_job_id,
end_time=end_time,
diff --git a/airflow/providers/airbyte/triggers/airbyte.py
b/airflow/providers/airbyte/triggers/airbyte.py
index cec032a5e0..67a9be69b8 100644
--- a/airflow/providers/airbyte/triggers/airbyte.py
+++ b/airflow/providers/airbyte/triggers/airbyte.py
@@ -18,7 +18,7 @@ from __future__ import annotations
import asyncio
import time
-from typing import Any, AsyncIterator
+from typing import Any, AsyncIterator, Literal
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -32,6 +32,7 @@ class AirbyteSyncTrigger(BaseTrigger):
makes use of asynchronous communication to check the progress of a job run
over time.
:param conn_id: The connection identifier for connecting to Airbyte.
+ :param api_type: The type of Airbyte API to use. Either "config" or
"cloud".
:param job_id: The ID of an Airbyte Sync job.
:param end_time: Time in seconds to wait for a job run to reach a terminal
status. Defaults to 7 days.
:param poll_interval: polling period in seconds to check for the status.
@@ -41,12 +42,14 @@ class AirbyteSyncTrigger(BaseTrigger):
self,
job_id: int,
conn_id: str,
+ api_type: Literal["config", "cloud"],
end_time: float,
poll_interval: float,
):
super().__init__()
self.job_id = job_id
self.conn_id = conn_id
+ self.api_type = api_type
self.end_time = end_time
self.poll_interval = poll_interval
@@ -57,6 +60,7 @@ class AirbyteSyncTrigger(BaseTrigger):
{
"job_id": self.job_id,
"conn_id": self.conn_id,
+ "api_type": self.api_type,
"end_time": self.end_time,
"poll_interval": self.poll_interval,
},
@@ -64,7 +68,7 @@ class AirbyteSyncTrigger(BaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Airbyte, polls for the pipeline run
status."""
- hook = AirbyteHook(airbyte_conn_id=self.conn_id)
+ hook = AirbyteHook(airbyte_conn_id=self.conn_id,
api_type=self.api_type)
try:
while await self.is_still_running(hook):
if self.end_time < time.time():
diff --git a/docs/apache-airflow-providers-airbyte/connections.rst
b/docs/apache-airflow-providers-airbyte/connections.rst
index 31b69c70a6..28f6a3a747 100644
--- a/docs/apache-airflow-providers-airbyte/connections.rst
+++ b/docs/apache-airflow-providers-airbyte/connections.rst
@@ -21,8 +21,8 @@ Airbyte Connection
==================
The Airbyte connection type use the HTTP protocol.
-Configuring the Connection
---------------------------
+Configuring the Connection - Config API
+---------------------------------------
Host(required)
The host to connect to the Airbyte server.
@@ -34,3 +34,11 @@ Login (optional)
Password (optional)
Specify the password to connect.
+
+Configuring the Connection - Cloud API
+--------------------------------------
+Host(required)
+ The host to connect to the Airbyte Cloud. (Typically
``https://api.airbyte.com``)
+
+Password (required)
+ Cloud API Key obtained from https://portal.airbyte.com/
diff --git a/docs/apache-airflow-providers-airbyte/operators/airbyte.rst
b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst
index 60f47955dd..55eb110163 100644
--- a/docs/apache-airflow-providers-airbyte/operators/airbyte.rst
+++ b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst
@@ -38,6 +38,11 @@ create in Airbyte between a source and destination
synchronization job.
Use the ``airbyte_conn_id`` parameter to specify the Airbyte connection to use
to
connect to your account.
+Airbyte currently supports two different API's. The first one is the `Config
API
<https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html>`_
+which is specifically used for Open Source Airbyte Instances. The second is
the `Cloud API <https://reference.airbyte.com/reference/start>`_
+which is used for the Airbyte Cloud Service. If you are using Airbyte's Cloud
service,
+then you will need to specify ``api_type="cloud"`` as part of the Operator's
parameters.
+
You can trigger a synchronization job in Airflow in two ways with the
Operator. The first one is a synchronous process.
This Operator will initiate the Airbyte job, and the Operator manages the job
status. Another way is to use the flag
``async = True`` so the Operator only triggers the job and returns the
``job_id``, passed to the AirbyteSensor.
diff --git a/tests/providers/airbyte/triggers/test_airbyte.py
b/tests/providers/airbyte/triggers/test_airbyte.py
index 103df7cf00..4de34f5053 100644
--- a/tests/providers/airbyte/triggers/test_airbyte.py
+++ b/tests/providers/airbyte/triggers/test_airbyte.py
@@ -31,6 +31,7 @@ class TestAirbyteSyncTrigger:
DAG_ID = "airbyte_sync_run"
TASK_ID = "airbyte_sync_run_task_op"
JOB_ID = 1234
+ API_TYPE = "config"
CONN_ID = "airbyte_default"
END_TIME = time.time() + 60 * 60 * 24 * 7
POLL_INTERVAL = 3.0
@@ -38,13 +39,18 @@ class TestAirbyteSyncTrigger:
def test_serialization(self):
"""Assert TestAirbyteSyncTrigger correctly serializes its arguments
and classpath."""
trigger = AirbyteSyncTrigger(
- conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL,
end_time=self.END_TIME, job_id=self.JOB_ID
+ api_type=self.API_TYPE,
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ job_id=self.JOB_ID,
)
classpath, kwargs = trigger.serialize()
assert classpath ==
"airflow.providers.airbyte.triggers.airbyte.AirbyteSyncTrigger"
assert kwargs == {
"job_id": self.JOB_ID,
"conn_id": self.CONN_ID,
+ "api_type": self.API_TYPE,
"end_time": self.END_TIME,
"poll_interval": self.POLL_INTERVAL,
}
@@ -55,6 +61,7 @@ class TestAirbyteSyncTrigger:
"""Test AirbyteSyncTrigger is triggered with mocked details and run
successfully."""
mocked_is_still_running.return_value = True
trigger = AirbyteSyncTrigger(
+ api_type=self.API_TYPE,
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
end_time=self.END_TIME,
@@ -83,6 +90,7 @@ class TestAirbyteSyncTrigger:
mocked_is_still_running.return_value = False
mock_get_job_status.return_value = mock_value
trigger = AirbyteSyncTrigger(
+ api_type=self.API_TYPE,
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
end_time=self.END_TIME,
@@ -114,7 +122,11 @@ class TestAirbyteSyncTrigger:
mocked_is_still_running.return_value = False
mock_get_job_status.return_value = mock_value
trigger = AirbyteSyncTrigger(
- conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL,
end_time=self.END_TIME, job_id=self.JOB_ID
+ api_type=self.API_TYPE,
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ job_id=self.JOB_ID,
)
expected_result = {
"status": mock_status,
@@ -142,6 +154,7 @@ class TestAirbyteSyncTrigger:
mocked_is_still_running.return_value = False
mock_get_job_status.return_value = mock_value
trigger = AirbyteSyncTrigger(
+ api_type=self.API_TYPE,
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
end_time=self.END_TIME,
@@ -165,6 +178,7 @@ class TestAirbyteSyncTrigger:
mocked_is_still_running.return_value = False
mock_get_job_status.side_effect = Exception("Test exception")
trigger = AirbyteSyncTrigger(
+ api_type=self.API_TYPE,
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
end_time=self.END_TIME,
@@ -190,6 +204,7 @@ class TestAirbyteSyncTrigger:
mock_get_job_status.side_effect = Exception("Test exception")
end_time = time.time()
trigger = AirbyteSyncTrigger(
+ api_type=self.API_TYPE,
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
end_time=end_time,
@@ -223,6 +238,7 @@ class TestAirbyteSyncTrigger:
hook = mock.AsyncMock(AirbyteHook)
hook.get_job_status.return_value = mock_response
trigger = AirbyteSyncTrigger(
+ api_type=self.API_TYPE,
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
end_time=self.END_TIME,
@@ -247,7 +263,11 @@ class TestAirbyteSyncTrigger:
airbyte_hook = mock.AsyncMock(AirbyteHook)
airbyte_hook.get_job_status.return_value = mock_response
trigger = AirbyteSyncTrigger(
- conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL,
end_time=self.END_TIME, job_id=self.JOB_ID
+ api_type=self.API_TYPE,
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ job_id=self.JOB_ID,
)
response = await trigger.is_still_running(airbyte_hook)
assert response == expected_status