This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d9121a7b9e Fix `DataFusionAsyncHook` catch 404 (#32855)
d9121a7b9e is described below
commit d9121a7b9e3d2456c1c3cd9c09d8404501a360c8
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Aug 2 13:42:45 2023 +0200
Fix `DataFusionAsyncHook` catch 404 (#32855)
---
airflow/providers/google/cloud/hooks/datafusion.py | 31 +++++++++++++---------
.../google/cloud/datafusion/example_datafusion.py | 11 ++++----
.../cloud/datafusion/example_datafusion_async.py | 11 ++++----
3 files changed, 31 insertions(+), 22 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/datafusion.py
b/airflow/providers/google/cloud/hooks/datafusion.py
index 770e3da391..de426bc242 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -511,17 +511,25 @@ class DataFusionAsyncHook(GoogleBaseAsyncHook):
return urljoin(f"{instance_url}/",
f"v3/namespaces/{quote(namespace)}/apps/")
async def _get_link(self, url: str, session):
- async with Token(scopes=self.scopes) as token:
- session_aio = AioSession(session)
- headers = {
- "Authorization": f"Bearer {await token.get()}",
- }
- try:
- pipeline = await session_aio.get(url=url, headers=headers)
- except AirflowException:
- pass # Because the pipeline may not be visible in system yet
-
- return pipeline
+ # Adding sleep generator to catch 404 in case if pipeline was not
retrieved during first attempt.
+ for time_to_wait in exponential_sleep_generator(initial=10,
maximum=120):
+ async with Token(scopes=self.scopes) as token:
+ session_aio = AioSession(session)
+ headers = {
+ "Authorization": f"Bearer {await token.get()}",
+ }
+ try:
+ pipeline = await session_aio.get(url=url, headers=headers)
+ break
+ except Exception as exc:
+ if "404" in str(exc):
+ sleep(time_to_wait)
+ else:
+ raise
+ if pipeline:
+ return pipeline
+ else:
+ raise AirflowException("Could not retrieve pipeline. Aborting.")
async def get_pipeline(
self,
@@ -567,7 +575,6 @@ class DataFusionAsyncHook(GoogleBaseAsyncHook):
pipeline_id=pipeline_id,
session=session,
)
- self.log.info("Response pipeline: %s", pipeline)
pipeline = await pipeline.json(content_type=None)
current_pipeline_state = pipeline["status"]
diff --git
a/tests/system/providers/google/cloud/datafusion/example_datafusion.py
b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
index 4d3bb7418d..4b26639d15 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
@@ -42,21 +42,22 @@ from airflow.utils.trigger_rule import TriggerRule
# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
LOCATION = "europe-north1"
-DAG_ID = "example_data_fusion"
-INSTANCE_NAME = "test-instance"
+DAG_ID = "example_datafusion"
+INSTANCE_NAME = f"df-{ENV_ID}".replace("_", "-")
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}
-BUCKET_NAME_1 = "test-datafusion-1"
-BUCKET_NAME_2 = "test-datafusion-2"
+BUCKET_NAME_1 = f"bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-")
+BUCKET_NAME_2 = f"bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-")
BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
-PIPELINE_NAME = "test-pipe"
+PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",
diff --git
a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
index 43b1241e59..2606d30ab8 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
@@ -41,21 +41,22 @@ from airflow.utils.trigger_rule import TriggerRule
# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-DAG_ID = "example_data_fusion_async"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_datafusion_async"
LOCATION = "europe-north1"
-INSTANCE_NAME = "test-instance-async"
+INSTANCE_NAME = f"async-df-{ENV_ID}".replace("_", "-")
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}
-BUCKET_NAME_1 = "test-datafusion-async-1"
-BUCKET_NAME_2 = "test-datafusion-async-2"
+BUCKET_NAME_1 = f"a-bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-")
+BUCKET_NAME_2 = f"a-bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-")
BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
-PIPELINE_NAME = "test-pipe"
+PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",