MarcusCramer91 opened a new issue, #55996:
URL: https://github.com/apache/airflow/issues/55996
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
apache-airflow-providers-google==18.0.0
### Apache Airflow version
3.0.6
### Operating System
Debian GNU/Linux 12 (bookworm)
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### What happened
When using the CloudRunExecuteJobOperator with deferred=True, the following
error appears in the logs:
```
ERROR - Top level error: source="task"
ValueError: dictionary update sequence element #0 has length 1; 2 is required
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 1353 in main
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 1291 in finalize
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py",
line 112 in get_link
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py",
line 92 in get_config
```
It seems this error comes from the operator writing `log_uri` as plain
string, while `BaseGoogleLink.get_config` seems to expect only dictionary
entries. When monkey-patching `get_config` like this
```
def _patched(self, operator, ti_key):
conf = {}
conf.update(getattr(operator, "extra_links_params", {}))
from airflow.sdk.execution_time.xcom import XCom
x = XCom.get_value(key=self.key, ti_key=ti_key) or {}
if isinstance(x, str):
x = {self.key: x}
conf.update(x)
# if the config did not define, return None to stop URL formatting
if not conf:
return None
# Add a default value for the 'namespace' parameter for backward
compatibility.
# This is for datafusion
conf.setdefault("namespace", "default")
return conf
BaseGoogleLink.get_config = _patched
```
the error disappears. Not sure if it's actually the root course or the best
approach to handle this.
### What you think should happen instead
No error (whether that means writing a dictionary to xcom, or handling the
`get_config` differently, I am not sure).
### How to reproduce
Simple example DAG:
```
@dag(dag_id="test")
def get_dag() -> None:
op = CloudRunExecuteJobOperator(
task_id="test-job",
project_id="my-project",
region="my-region",
job_name="my-job-name",
deferrable=True,
)
op
get_dag()
```
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]