This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 11564a0297 Fixed the hardcoded default namespace value for GCP Data
Fusion links. (#35379)
11564a0297 is described below
commit 11564a0297e3a4791836e26b260ce9428daa387e
Author: synsh <[email protected]>
AuthorDate: Sun Feb 4 05:09:50 2024 +0530
Fixed the hardcoded default namespace value for GCP Data Fusion links.
(#35379)
* update
* Add backward compatibility and cleanup
* fix formatting
---
airflow/providers/google/cloud/links/datafusion.py | 17 ++++++++++++-----
airflow/providers/google/cloud/operators/datafusion.py | 10 +++++++++-
2 files changed, 21 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/google/cloud/links/datafusion.py
b/airflow/providers/google/cloud/links/datafusion.py
index 00afd700bd..c31cf09b42 100644
--- a/airflow/providers/google/cloud/links/datafusion.py
+++ b/airflow/providers/google/cloud/links/datafusion.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""This module contains Google Compute Engine links."""
+"""This module contains Google Data Fusion links."""
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
@@ -30,8 +30,8 @@ if TYPE_CHECKING:
BASE_LINK = "https://console.cloud.google.com/data-fusion"
DATAFUSION_INSTANCE_LINK = BASE_LINK +
"/locations/{region}/instances/{instance_name}?project={project_id}"
-DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/default/pipelines"
-DATAFUSION_PIPELINE_LINK = "{uri}/pipelines/ns/default/view/{pipeline_name}"
+DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/{namespace}/pipelines"
+DATAFUSION_PIPELINE_LINK =
"{uri}/pipelines/ns/{namespace}/view/{pipeline_name}"
class BaseGoogleLink(BaseOperatorLink):
@@ -52,10 +52,13 @@ class BaseGoogleLink(BaseOperatorLink):
ti_key: TaskInstanceKey,
) -> str:
conf = XCom.get_value(key=self.key, ti_key=ti_key)
+
if not conf:
return ""
- if self.format_str.startswith("http"):
- return self.format_str.format(**conf)
+
+ # Add a default value for the 'namespace' parameter for backward
compatibility.
+ conf.setdefault("namespace", "default")
+
return self.format_str.format(**conf)
@@ -98,6 +101,7 @@ class DataFusionPipelineLink(BaseGoogleLink):
task_instance: BaseOperator,
uri: str,
pipeline_name: str,
+ namespace: str,
):
task_instance.xcom_push(
context=context,
@@ -105,6 +109,7 @@ class DataFusionPipelineLink(BaseGoogleLink):
value={
"uri": uri,
"pipeline_name": pipeline_name,
+ "namespace": namespace,
},
)
@@ -121,11 +126,13 @@ class DataFusionPipelinesLink(BaseGoogleLink):
context: Context,
task_instance: BaseOperator,
uri: str,
+ namespace: str,
):
task_instance.xcom_push(
context=context,
key=DataFusionPipelinesLink.key,
value={
"uri": uri,
+ "namespace": namespace,
},
)
diff --git a/airflow/providers/google/cloud/operators/datafusion.py
b/airflow/providers/google/cloud/operators/datafusion.py
index 4f62b82407..bba0206c38 100644
--- a/airflow/providers/google/cloud/operators/datafusion.py
+++ b/airflow/providers/google/cloud/operators/datafusion.py
@@ -537,6 +537,7 @@ class
CloudDataFusionCreatePipelineOperator(GoogleCloudBaseOperator):
task_instance=self,
uri=instance["serviceEndpoint"],
pipeline_name=self.pipeline_name,
+ namespace=self.namespace,
)
self.log.info("Pipeline %s created", self.pipeline_name)
@@ -705,7 +706,12 @@ class
CloudDataFusionListPipelinesOperator(GoogleCloudBaseOperator):
)
self.log.info("Pipelines: %s", pipelines)
- DataFusionPipelinesLink.persist(context=context, task_instance=self,
uri=service_endpoint)
+ DataFusionPipelinesLink.persist(
+ context=context,
+ task_instance=self,
+ uri=service_endpoint,
+ namespace=self.namespace,
+ )
return pipelines
@@ -825,6 +831,7 @@ class
CloudDataFusionStartPipelineOperator(GoogleCloudBaseOperator):
task_instance=self,
uri=instance["serviceEndpoint"],
pipeline_name=self.pipeline_name,
+ namespace=self.namespace,
)
if self.deferrable:
@@ -954,6 +961,7 @@ class
CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator):
task_instance=self,
uri=instance["serviceEndpoint"],
pipeline_name=self.pipeline_name,
+ namespace=self.namespace,
)
hook.stop_pipeline(
pipeline_name=self.pipeline_name,