piffarettig opened a new issue, #30494:
URL: https://github.com/apache/airflow/issues/30494

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-google==8.11.0`
   
   ### Apache Airflow version
   
   2.5.2
   
   ### Operating System
   
   Debian GNU/Linux
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   I'm using a PythonOperator that just executes some SQL and make the results 
available to a subsequent task via the usage of xcoms. This is because there's 
no default operator available for Cloud Spanner that gives me that for free. 
   
   Why do I need to do this? Because I need to DROP all Spanner tables that 
follow a given prefix, so it's not that I can anticipate the table names. 
They're dynamic (I only know about their prefix).
   
   However, my code fails when I do `with database.snapshot() as snapshot:`.  I 
get:
   
   > TypeError: Expected int, bytes, or behavior, got <class 
'grpc_gcp_pb2.ApiConfig'>
   
   That's the exact same error mentioned 
[here](https://github.com/apache/beam/issues/22454).
   
   And given my Airflow version is 2.5.2, the Google Provider requires:
   
   - `google-api-core>=2.7.0,<3.0.0`
   - `google-cloud-spanner>=1.10.0,<2.0.0`
   
   Which resolves to:
   
   - `google-api-core==2.8.2`
   - `google-cloud-spanner==1.19.3`
   
   And those are the exact same dependency versions mentioned in the issue 
above. These dependencies don't seem to change in more recent Airflow versions.
   
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Generate the following task in a DAG:
   
   ```python
   def my_task() -> PythonOperator:
       task_id = "generate-spanner-drop-statements"
       return PythonOperator(
           task_id=task_id,
           python_callable=_delete_old_spanner_tables,
           provide_context=True,
           op_kwargs={
               "table_prefix": "your table prefix",
               "project_id": "your project id",
               "instance_id": "your instance id",
               "database_id": "your database id"
           },
       )
       
   def _delete_old_spanner_tables(table_prefix: str, project_id: str, 
instance_id: str, database_id: str, **kwargs):
       hook = SpannerHook()
       database = hook.get_database(
           project_id=project_id,
           instance_id=instance_id,
           database_id=database_id
       )
       
       if not database:
           raise AirflowException(
               f"The Cloud Spanner database '{database_id}' in project 
'{project_id}' "
               f"and instance '{instance_id}' is missing."
           )
   
       # Select statement that gives me the drop statements of the tables and 
indexes I want to delete
       with open("sql/drop_old_spanner_tables.sql", "r") as f:
           contents = f.read()
       statement = contents.replace("{{ table_prefix }}", table_prefix)
   
       with database.snapshot() as snapshot:
           result = snapshot.execute_sql(statement)
           rows = list()
           for row in result:
               rows.append(row)
           task_instance = kwargs['task_instance']
           task_instance.xcom_push(key='drop_statements', value=rows)
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to