This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 11564a0297 Fixed the hardcoded default namespace value for GCP Data 
Fusion links. (#35379)
11564a0297 is described below

commit 11564a0297e3a4791836e26b260ce9428daa387e
Author: synsh <[email protected]>
AuthorDate: Sun Feb 4 05:09:50 2024 +0530

    Fixed the hardcoded default namespace value for GCP Data Fusion links. 
(#35379)
    
    * update
    
    * Add backward compatibility and cleanup
    
    * fix formatting
---
 airflow/providers/google/cloud/links/datafusion.py     | 17 ++++++++++++-----
 airflow/providers/google/cloud/operators/datafusion.py | 10 +++++++++-
 2 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/google/cloud/links/datafusion.py 
b/airflow/providers/google/cloud/links/datafusion.py
index 00afd700bd..c31cf09b42 100644
--- a/airflow/providers/google/cloud/links/datafusion.py
+++ b/airflow/providers/google/cloud/links/datafusion.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""This module contains Google Compute Engine links."""
+"""This module contains Google Data Fusion links."""
 from __future__ import annotations
 
 from typing import TYPE_CHECKING, ClassVar
@@ -30,8 +30,8 @@ if TYPE_CHECKING:
 
 BASE_LINK = "https://console.cloud.google.com/data-fusion";
 DATAFUSION_INSTANCE_LINK = BASE_LINK + 
"/locations/{region}/instances/{instance_name}?project={project_id}"
-DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/default/pipelines"
-DATAFUSION_PIPELINE_LINK = "{uri}/pipelines/ns/default/view/{pipeline_name}"
+DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/{namespace}/pipelines"
+DATAFUSION_PIPELINE_LINK = 
"{uri}/pipelines/ns/{namespace}/view/{pipeline_name}"
 
 
 class BaseGoogleLink(BaseOperatorLink):
@@ -52,10 +52,13 @@ class BaseGoogleLink(BaseOperatorLink):
         ti_key: TaskInstanceKey,
     ) -> str:
         conf = XCom.get_value(key=self.key, ti_key=ti_key)
+
         if not conf:
             return ""
-        if self.format_str.startswith("http"):
-            return self.format_str.format(**conf)
+
+        # Add a default value for the 'namespace' parameter for backward 
compatibility.
+        conf.setdefault("namespace", "default")
+
         return self.format_str.format(**conf)
 
 
@@ -98,6 +101,7 @@ class DataFusionPipelineLink(BaseGoogleLink):
         task_instance: BaseOperator,
         uri: str,
         pipeline_name: str,
+        namespace: str,
     ):
         task_instance.xcom_push(
             context=context,
@@ -105,6 +109,7 @@ class DataFusionPipelineLink(BaseGoogleLink):
             value={
                 "uri": uri,
                 "pipeline_name": pipeline_name,
+                "namespace": namespace,
             },
         )
 
@@ -121,11 +126,13 @@ class DataFusionPipelinesLink(BaseGoogleLink):
         context: Context,
         task_instance: BaseOperator,
         uri: str,
+        namespace: str,
     ):
         task_instance.xcom_push(
             context=context,
             key=DataFusionPipelinesLink.key,
             value={
                 "uri": uri,
+                "namespace": namespace,
             },
         )
diff --git a/airflow/providers/google/cloud/operators/datafusion.py 
b/airflow/providers/google/cloud/operators/datafusion.py
index 4f62b82407..bba0206c38 100644
--- a/airflow/providers/google/cloud/operators/datafusion.py
+++ b/airflow/providers/google/cloud/operators/datafusion.py
@@ -537,6 +537,7 @@ class 
CloudDataFusionCreatePipelineOperator(GoogleCloudBaseOperator):
             task_instance=self,
             uri=instance["serviceEndpoint"],
             pipeline_name=self.pipeline_name,
+            namespace=self.namespace,
         )
         self.log.info("Pipeline %s created", self.pipeline_name)
 
@@ -705,7 +706,12 @@ class 
CloudDataFusionListPipelinesOperator(GoogleCloudBaseOperator):
         )
         self.log.info("Pipelines: %s", pipelines)
 
-        DataFusionPipelinesLink.persist(context=context, task_instance=self, 
uri=service_endpoint)
+        DataFusionPipelinesLink.persist(
+            context=context,
+            task_instance=self,
+            uri=service_endpoint,
+            namespace=self.namespace,
+        )
         return pipelines
 
 
@@ -825,6 +831,7 @@ class 
CloudDataFusionStartPipelineOperator(GoogleCloudBaseOperator):
             task_instance=self,
             uri=instance["serviceEndpoint"],
             pipeline_name=self.pipeline_name,
+            namespace=self.namespace,
         )
 
         if self.deferrable:
@@ -954,6 +961,7 @@ class 
CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator):
             task_instance=self,
             uri=instance["serviceEndpoint"],
             pipeline_name=self.pipeline_name,
+            namespace=self.namespace,
         )
         hook.stop_pipeline(
             pipeline_name=self.pipeline_name,

Reply via email to