This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d9121a7b9e Fix `DataFusionAsyncHook` catch 404 (#32855)
d9121a7b9e is described below

commit d9121a7b9e3d2456c1c3cd9c09d8404501a360c8
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Aug 2 13:42:45 2023 +0200

    Fix `DataFusionAsyncHook` catch 404 (#32855)
---
 airflow/providers/google/cloud/hooks/datafusion.py | 31 +++++++++++++---------
 .../google/cloud/datafusion/example_datafusion.py  | 11 ++++----
 .../cloud/datafusion/example_datafusion_async.py   | 11 ++++----
 3 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/datafusion.py 
b/airflow/providers/google/cloud/hooks/datafusion.py
index 770e3da391..de426bc242 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -511,17 +511,25 @@ class DataFusionAsyncHook(GoogleBaseAsyncHook):
         return urljoin(f"{instance_url}/", 
f"v3/namespaces/{quote(namespace)}/apps/")
 
     async def _get_link(self, url: str, session):
-        async with Token(scopes=self.scopes) as token:
-            session_aio = AioSession(session)
-            headers = {
-                "Authorization": f"Bearer {await token.get()}",
-            }
-            try:
-                pipeline = await session_aio.get(url=url, headers=headers)
-            except AirflowException:
-                pass  # Because the pipeline may not be visible in system yet
-
-        return pipeline
+        # Adding sleep generator to catch 404 in case if pipeline was not 
retrieved during first attempt.
+        for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
+            async with Token(scopes=self.scopes) as token:
+                session_aio = AioSession(session)
+                headers = {
+                    "Authorization": f"Bearer {await token.get()}",
+                }
+                try:
+                    pipeline = await session_aio.get(url=url, headers=headers)
+                    break
+                except Exception as exc:
+                    if "404" in str(exc):
+                        sleep(time_to_wait)
+                    else:
+                        raise
+        if pipeline:
+            return pipeline
+        else:
+            raise AirflowException("Could not retrieve pipeline. Aborting.")
 
     async def get_pipeline(
         self,
@@ -567,7 +575,6 @@ class DataFusionAsyncHook(GoogleBaseAsyncHook):
                     pipeline_id=pipeline_id,
                     session=session,
                 )
-                self.log.info("Response pipeline: %s", pipeline)
                 pipeline = await pipeline.json(content_type=None)
                 current_pipeline_state = pipeline["status"]
 
diff --git 
a/tests/system/providers/google/cloud/datafusion/example_datafusion.py 
b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
index 4d3bb7418d..4b26639d15 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
@@ -42,21 +42,22 @@ from airflow.utils.trigger_rule import TriggerRule
 # [START howto_data_fusion_env_variables]
 SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 LOCATION = "europe-north1"
-DAG_ID = "example_data_fusion"
-INSTANCE_NAME = "test-instance"
+DAG_ID = "example_datafusion"
+INSTANCE_NAME = f"df-{ENV_ID}".replace("_", "-")
 INSTANCE = {
     "type": "BASIC",
     "displayName": INSTANCE_NAME,
     "dataprocServiceAccount": SERVICE_ACCOUNT,
 }
 
-BUCKET_NAME_1 = "test-datafusion-1"
-BUCKET_NAME_2 = "test-datafusion-2"
+BUCKET_NAME_1 = f"bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-")
+BUCKET_NAME_2 = f"bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-")
 BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
 BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
 
-PIPELINE_NAME = "test-pipe"
+PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
 PIPELINE = {
     "artifact": {
         "name": "cdap-data-pipeline",
diff --git 
a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py 
b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
index 43b1241e59..2606d30ab8 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
@@ -41,21 +41,22 @@ from airflow.utils.trigger_rule import TriggerRule
 # [START howto_data_fusion_env_variables]
 SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-DAG_ID = "example_data_fusion_async"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_datafusion_async"
 LOCATION = "europe-north1"
-INSTANCE_NAME = "test-instance-async"
+INSTANCE_NAME = f"async-df-{ENV_ID}".replace("_", "-")
 INSTANCE = {
     "type": "BASIC",
     "displayName": INSTANCE_NAME,
     "dataprocServiceAccount": SERVICE_ACCOUNT,
 }
 
-BUCKET_NAME_1 = "test-datafusion-async-1"
-BUCKET_NAME_2 = "test-datafusion-async-2"
+BUCKET_NAME_1 = f"a-bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-")
+BUCKET_NAME_2 = f"a-bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-")
 BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
 BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
 
-PIPELINE_NAME = "test-pipe"
+PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
 PIPELINE = {
     "artifact": {
         "name": "cdap-data-pipeline",

Reply via email to