MarcusCramer91 opened a new issue, #55996:
URL: https://github.com/apache/airflow/issues/55996

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==18.0.0
   
   ### Apache Airflow version
   
   3.0.6
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When using the CloudRunExecuteJobOperator with deferred=True, the following 
error appears in the logs:
   
   ```
   ERROR - Top level error: source="task"
   ValueError: dictionary update sequence element #0 has length 1; 2 is required
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1353 in main
   
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1291 in finalize
   
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py",
 line 112 in get_link
   
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py",
 line 92 in get_config
   ```
   
   It seems this error comes from the operator writing `log_uri` as plain 
string, while `BaseGoogleLink.get_config` seems to expect only dictionary 
entries. When monkey-patching `get_config` like this
   
   ```
   def _patched(self, operator, ti_key):
       conf = {}
       conf.update(getattr(operator, "extra_links_params", {}))
       from airflow.sdk.execution_time.xcom import XCom
   
       x = XCom.get_value(key=self.key, ti_key=ti_key) or {}
       if isinstance(x, str):
           x = {self.key: x}
       conf.update(x)
       # if the config did not define, return None to stop URL formatting
       if not conf:
           return None
   
       # Add a default value for the 'namespace' parameter for backward 
compatibility.
       # This is for datafusion
       conf.setdefault("namespace", "default")
       return conf
   
   BaseGoogleLink.get_config = _patched
   ```
   
   the error disappears. Not sure if it's actually the root course or the best 
approach to handle this.
   
   ### What you think should happen instead
   
   No error (whether that means writing a dictionary to xcom, or handling the 
`get_config` differently, I am not sure).
   
   ### How to reproduce
   
   Simple example DAG:
   
   ```
   @dag(dag_id="test")
   def get_dag() -> None:
       op = CloudRunExecuteJobOperator(
           task_id="test-job",
           project_id="my-project",
           region="my-region",
           job_name="my-job-name",
           deferrable=True,
       )
       op
   
   get_dag()
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to